Skip to content

Commit

Permalink
network address refactoring (#129)
Browse files Browse the repository at this point in the history
Moved out the address package to a separate repository for reuse in message storage (upcoming). Few things here:

* There's a breaking change, public alias now retrieves actual public IP address from the network interface and not an external one, added an external alias to retrieve the external address. Config defaults now to external address (same behavior, different name)
* Improved acceptance test with unsubscribe, ping and disconnect.
  • Loading branch information
kelindar authored May 29, 2018
1 parent 36d0f2e commit b2f0068
Show file tree
Hide file tree
Showing 20 changed files with 515 additions and 264 deletions.
4 changes: 2 additions & 2 deletions broker/cluster/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"sync"
"time"

"github.com/emitter-io/address"
"github.com/emitter-io/emitter/async"
"github.com/emitter-io/emitter/broker/message"
"github.com/emitter-io/emitter/config"
"github.com/emitter-io/emitter/network/address"
"github.com/emitter-io/emitter/provider/logging"
"github.com/emitter-io/emitter/security"
"github.com/weaveworks/mesh"
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *Swarm) Close() error {

// getLocalPeerName retrieves or generates a local node name.
func getLocalPeerName(cfg *config.ClusterConfig) mesh.PeerName {
peerName := mesh.PeerName(address.Hardware())
peerName := mesh.PeerName(address.GetHardware())
if cfg.NodeName != "" {
if name, err := mesh.PeerNameFromString(cfg.NodeName); err != nil {
logging.LogError("swarm", "getting node name", err)
Expand Down
4 changes: 2 additions & 2 deletions broker/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"sync/atomic"
"time"

"github.com/emitter-io/address"
"github.com/emitter-io/emitter/broker/message"
"github.com/emitter-io/emitter/network/address"
"github.com/emitter-io/emitter/network/mqtt"
"github.com/emitter-io/emitter/provider/contract"
"github.com/emitter-io/emitter/provider/logging"
Expand Down Expand Up @@ -57,7 +57,7 @@ func (s *Service) newConn(t net.Conn) *Conn {
}

// Generate a globally unique id as well
c.guid = c.luid.Unique(uint64(address.Hardware()), "emitter")
c.guid = c.luid.Unique(uint64(address.GetHardware()), "emitter")
logging.LogTarget("conn", "created", c.guid)

// Increment the connection counter
Expand Down
23 changes: 9 additions & 14 deletions broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
"syscall"
"time"

"github.com/emitter-io/address"
"github.com/emitter-io/emitter/broker/cluster"
"github.com/emitter-io/emitter/broker/message"
"github.com/emitter-io/emitter/config"
"github.com/emitter-io/emitter/network/address"
"github.com/emitter-io/emitter/network/listener"
"github.com/emitter-io/emitter/network/websocket"
"github.com/emitter-io/emitter/provider/contract"
Expand Down Expand Up @@ -143,7 +143,6 @@ func NewService(cfg *config.Config) (s *Service, err error) {
logging.LogTarget("service", "configured monitoring sink", s.monitor.Name())

// Addresses and things
logging.LogTarget("service", "configured external address", address.External())
logging.LogTarget("service", "configured node name", address.Fingerprint(s.LocalName()).String())
return s, nil
}
Expand All @@ -154,7 +153,7 @@ func (s *Service) LocalName() uint64 {
return s.cluster.ID()
}

return uint64(address.Hardware())
return uint64(address.GetHardware())
}

// NumPeers returns the number of peers of this service.
Expand Down Expand Up @@ -186,7 +185,7 @@ func (s *Service) Listen(ctx context.Context) (err error) {
}

// Setup the listeners on both default and a secure addresses
s.listen(s.Config.ListenAddr, nil)
s.listen(s.Config.Addr(), nil)
if tls, tlsValidator, ok := s.Config.Certificate(); ok {

// If we need to validate certificate, spin up a listener on port 80
Expand All @@ -195,7 +194,9 @@ func (s *Service) Listen(ctx context.Context) (err error) {
go http.ListenAndServe(":80", tlsValidator)
}

s.listen(s.Config.TLS.ListenAddr, tls)
if tlsAddr, err := address.Parse(s.Config.TLS.ListenAddr, 443); err == nil {
s.listen(tlsAddr, tls)
}
}

// Block
Expand All @@ -204,17 +205,11 @@ func (s *Service) Listen(ctx context.Context) (err error) {
}

// listen configures an main listener on a specified address.
func (s *Service) listen(addr string, conf *tls.Config) {

// Parse the listen address
listenAddr, err := address.Parse(addr, 0)
if err != nil {
panic(err)
}
func (s *Service) listen(addr *net.TCPAddr, conf *tls.Config) {

// Create new listener
logging.LogTarget("service", "starting the listener", listenAddr)
l, err := listener.New(listenAddr.String(), conf)
logging.LogTarget("service", "starting the listener", addr)
l, err := listener.New(addr.String(), conf)
if err != nil {
panic(err)
}
Expand Down
42 changes: 40 additions & 2 deletions broker/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Test_onHTTPPresence(t *testing.T) {
func TestPubsub(t *testing.T) {
cfg := config.NewDefault().(*config.Config)
cfg.License = testLicense
cfg.ListenAddr = ":9998"
cfg.ListenAddr = "127.0.0.1:9998"
cfg.Cluster = nil
cfg.TLS = &conf.TLSConfig{}

Expand All @@ -148,7 +148,7 @@ func TestPubsub(t *testing.T) {
go broker.Listen(context.Background())

// Create a client
cli, dialErr := net.Dial("tcp", ":9998")
cli, dialErr := net.Dial("tcp", "127.0.0.1:9998")
assert.NoError(t, dialErr)
defer cli.Close()

Expand All @@ -165,6 +165,19 @@ func TestPubsub(t *testing.T) {
assert.Equal(t, mqtt.TypeOfConnack, pkt.Type())
}

{ // Ping the broker
ping := mqtt.Pingreq{}
n, err := ping.EncodeTo(cli)
assert.Equal(t, 2, n)
assert.NoError(t, err)
}

{ // Read pong
pkt, err := mqtt.DecodePacket(cli)
assert.NoError(t, err)
assert.Equal(t, mqtt.TypeOfPingresp, pkt.Type())
}

{ // Subscribe to a topic
sub := mqtt.Subscribe{
Header: &mqtt.StaticHeader{QOS: 0},
Expand Down Expand Up @@ -202,4 +215,29 @@ func TestPubsub(t *testing.T) {
Payload: []byte("hello world"),
}, pkt)
}

{ // Unsubscribe from the topic
sub := mqtt.Unsubscribe{
Header: &mqtt.StaticHeader{QOS: 0},
Topics: []mqtt.TopicQOSTuple{
{Topic: []byte("0Nq8SWbL8qoOKEDqh_ebBepug6cLLlWO/a/b/c/"), Qos: 0},
},
}
_, err := sub.EncodeTo(cli)
assert.NoError(t, err)
}

{ // Read unsuback
pkt, err := mqtt.DecodePacket(cli)
assert.NoError(t, err)
assert.Equal(t, mqtt.TypeOfUnsuback, pkt.Type())
}

{ // Disconnect from the broker
disconnect := mqtt.Disconnect{}
n, err := disconnect.EncodeTo(cli)
assert.Equal(t, 2, n)
assert.NoError(t, err)
}

}
6 changes: 3 additions & 3 deletions broker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package broker

import (
"github.com/emitter-io/emitter/network/address"
"github.com/emitter-io/address"
"github.com/emitter-io/stats"
)

Expand All @@ -38,7 +38,7 @@ func (s *sampler) Snapshot() (snapshot []byte) {
stat := s.service.measurer
serv := s.service
node := address.Fingerprint(serv.LocalName())
addr := address.External().String()
addr := serv.Config.Addr()

// Track runtime information
stat.MeasureRuntime()
Expand All @@ -51,7 +51,7 @@ func (s *sampler) Snapshot() (snapshot []byte) {

// Add node tags
stat.Tag("node.id", node.String())
stat.Tag("node.addr", addr)
stat.Tag("node.addr", addr.String())

// Create a snaphshot of all stats
if m, ok := stat.(stats.Snapshotter); ok {
Expand Down
4 changes: 4 additions & 0 deletions broker/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/emitter-io/emitter/broker/message"
"github.com/emitter-io/emitter/config"
"github.com/emitter-io/emitter/security"
"github.com/emitter-io/stats"
"github.com/stretchr/testify/assert"
Expand All @@ -18,6 +19,9 @@ func Test_sendStats(t *testing.T) {
subscriptions: message.NewTrie(),
measurer: stats.New(),
License: license,
Config: &config.Config{
ListenAddr: ":1234",
},
}
defer s.Close()

Expand Down
22 changes: 17 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"net/http"
"strings"

"github.com/emitter-io/address"
cfg "github.com/emitter-io/config"
"github.com/emitter-io/emitter/network/address"
"github.com/emitter-io/emitter/provider/logging"
)

Expand All @@ -32,12 +32,12 @@ const (
)

// VaultUser is the vault user to use for authentication
var VaultUser = toUsername(address.External())
var VaultUser = toUsername(address.GetExternalOrDefault(address.Loopback))

// toUsername converts an ip address to a username for Vault.
func toUsername(a net.IP) string {
func toUsername(a net.IPAddr) string {
return strings.Replace(
strings.Replace(a.String(), ".", "-", -1),
strings.Replace(a.IP.String(), ".", "-", -1),
":", "-", -1)
}

Expand All @@ -50,7 +50,7 @@ func NewDefault() cfg.Config {
},
Cluster: &ClusterConfig{
ListenAddr: ":4000",
AdvertiseAddr: "public:4000",
AdvertiseAddr: "external:4000",
},
Storage: &cfg.ProviderConfig{
Provider: "inmemory",
Expand All @@ -60,6 +60,7 @@ func NewDefault() cfg.Config {

// Config represents main configuration.
type Config struct {
listenAddr *net.TCPAddr // The listen address, parsed.
ListenAddr string `json:"listen"` // The API port used for TCP & Websocket communication.
License string `json:"license"` // The license file to use for the broker.
TLS *cfg.TLSConfig `json:"tls,omitempty"` // The API port used for Secure TCP & Websocket communication.
Expand All @@ -72,6 +73,17 @@ type Config struct {
Monitor *cfg.ProviderConfig `json:"monitor,omitempty"` // The configuration for the monitoring storage.
}

// Addr returns the listen address configured.
func (c *Config) Addr() *net.TCPAddr {
if c.listenAddr == nil {
var err error
if c.listenAddr, err = address.Parse(c.ListenAddr, 8080); err != nil {
panic(err)
}
}
return c.listenAddr
}

// Vault returns a vault configuration.
func (c *Config) Vault() *cfg.VaultConfig {
return c.Secrets
Expand Down
17 changes: 17 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -15,3 +16,19 @@ func Test_NewDefaut(t *testing.T) {
assert.Nil(t, tls)
assert.False(t, ok)
}

func Test_Addr(t *testing.T) {
c := &Config{
ListenAddr: "private",
}

addr := c.Addr()
assert.True(t, strings.HasSuffix(addr.String(), ":8080"))
}

func Test_AddrInvalid(t *testing.T) {
assert.Panics(t, func() {
c := &Config{ListenAddr: "g3ew235wgs"}
c.Addr()
})
}
2 changes: 1 addition & 1 deletion emitter.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
},
"cluster": {
"listen": ":4000",
"advertise": "public:4000"
"advertise": "external:4000"
},
"storage": {
"provider": "inmemory"
Expand Down
Loading

0 comments on commit b2f0068

Please sign in to comment.