Skip to content

Commit

Permalink
Added debug mode configuration (#255)
Browse files Browse the repository at this point in the history
This PR adds a debug mode flag in the config. Ppprof HTTP endpoints are now by default disabled and you'll need to flip the flag on to enable them.
  • Loading branch information
Roman Atachiants authored Jun 16, 2019
1 parent f660e64 commit 1f69f19
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 35 deletions.
1 change: 1 addition & 0 deletions internal/broker/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func newBenchClient(port int) *testConn {
func newTestBroker(port int, licenseVersion int) *Service {
cfg := config.NewDefault().(*config.Config)
cfg.License = testLicense
cfg.Debug = true
if licenseVersion == 2 {
cfg.License = testLicenseV2
}
Expand Down
33 changes: 5 additions & 28 deletions internal/broker/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/emitter-io/emitter/internal/provider/contract"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/security/hash"
Expand All @@ -43,27 +42,6 @@ var (

// ------------------------------------------------------------------------------------

// Authorize attempts to authorize a channel with its key
func (c *Conn) authorize(channel *security.Channel, permission uint8) (contract.Contract, security.Key, bool) {

// Attempt to parse the key
key, err := c.service.Cipher.DecryptKey(channel.Key)
if err != nil || key.IsExpired() {
return nil, nil, false
}

// Attempt to fetch the contract using the key. Underneath, it's cached.
contract, contractFound := c.service.contracts.Get(key.Contract())
if !contractFound || !contract.Validate(key) || !key.HasPermission(permission) || !key.ValidateChannel(channel) {
return nil, nil, false
}

// Return the contract and the key
return contract, key, true
}

// ------------------------------------------------------------------------------------

// onConnect handles the connection authorization
func (c *Conn) onConnect(packet *mqtt.Connect) bool {
c.username = string(packet.Username)
Expand All @@ -82,7 +60,7 @@ func (c *Conn) onSubscribe(mqttTopic []byte) *Error {
}

// Check the authorization and permissions
contract, key, allowed := c.authorize(channel, security.AllowRead)
contract, key, allowed := c.service.authorize(channel, security.AllowRead)
if !allowed {
return ErrUnauthorized
}
Expand Down Expand Up @@ -131,7 +109,7 @@ func (c *Conn) onUnsubscribe(mqttTopic []byte) *Error {
}

// Check the authorization and permissions
contract, key, allowed := c.authorize(channel, security.AllowRead)
contract, key, allowed := c.service.authorize(channel, security.AllowRead)
if !allowed {
return ErrUnauthorized
}
Expand Down Expand Up @@ -170,7 +148,7 @@ func (c *Conn) onPublish(packet *mqtt.Publish) *Error {
}

// Check the authorization and permissions
contract, key, allowed := c.authorize(channel, security.AllowWrite)
contract, key, allowed := c.service.authorize(channel, security.AllowWrite)
if !allowed {
return ErrUnauthorized
}
Expand Down Expand Up @@ -277,7 +255,7 @@ func (c *Conn) onLink(payload []byte) (response, bool) {
c.links[request.Name] = channel.String()

// If an auto-subscribe was requested and the key has read permissions, subscribe
if _, key, allowed := c.authorize(channel, security.AllowRead); allowed && request.Subscribe {
if _, key, allowed := c.service.authorize(channel, security.AllowRead); allowed && request.Subscribe {
c.Subscribe(message.NewSsid(key.Contract(), channel.Query), channel.Channel)
}

Expand All @@ -296,7 +274,7 @@ func (c *Conn) makePrivateChannel(chanKey, chanName string) *security.Channel {
}

// Make sure we can actually extend it
_, key, allowed := c.authorize(channel, security.AllowExtend)
_, key, allowed := c.service.authorize(channel, security.AllowExtend)
if !allowed {
return nil
}
Expand Down Expand Up @@ -400,7 +378,6 @@ func getAllPresence(s *Service, ssid message.Ssid) []presenceInfo {

// onPresence processes a presence request.
func (c *Conn) onPresence(payload []byte) (response, bool) {
// Deserialize the payload.
msg := presenceRequest{
Status: true, // Default: send status info
Changes: nil, // Default: send all changes
Expand Down
31 changes: 26 additions & 5 deletions internal/broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,16 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)

// Create a new HTTP request multiplexer
mux := http.NewServeMux()
if cfg.Debug {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
mux.HandleFunc("/health", s.onHealth)
mux.HandleFunc("/keygen", handleKeyGen(s))
mux.HandleFunc("/presence", s.onHTTPPresence)
mux.HandleFunc("/debug/pprof/", pprof.Index) // TODO: use config flag to enable/disable this
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) // TODO: use config flag to enable/disable this
mux.HandleFunc("/debug/pprof/profile", pprof.Profile) // TODO: use config flag to enable/disable this
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) // TODO: use config flag to enable/disable this
mux.HandleFunc("/debug/pprof/trace", pprof.Trace) // TODO: use config flag to enable/disable this
mux.HandleFunc("/", s.onRequest)

// Attach handlers
Expand Down Expand Up @@ -441,6 +443,25 @@ func (s *Service) publish(m *message.Message, exclude string) (n int64) {
return
}

// Authorize attempts to authorize a channel with its key
func (s *Service) authorize(channel *security.Channel, permission uint8) (contract.Contract, security.Key, bool) {

// Attempt to parse the key
key, err := s.Cipher.DecryptKey(channel.Key)
if err != nil || key.IsExpired() {
return nil, nil, false
}

// Attempt to fetch the contract using the key. Underneath, it's cached.
contract, contractFound := s.contracts.Get(key.Contract())
if !contractFound || !contract.Validate(key) || !key.HasPermission(permission) || !key.ValidateChannel(channel) {
return nil, nil, false
}

// Return the contract and the key
return contract, key, true
}

// SelfPublish publishes a message to itself.
func (s *Service) selfPublish(channelName string, payload []byte) {
channel := security.ParseChannel([]byte("emitter/" + channelName))
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func New(filename string, stores ...cfg.SecretStore) *Config {
type Config struct {
ListenAddr string `json:"listen"` // The API port used for TCP & Websocket communication.
License string `json:"license"` // The license file to use for the broker.
Debug bool `json:"debug,omitempty"` // The debug mode flag.
Limit LimitConfig `json:"limit,omitempty"` // Configuration for various limits such as message size.
TLS *cfg.TLSConfig `json:"tls,omitempty"` // The API port used for Secure TCP & Websocket communication.
Cluster *ClusterConfig `json:"cluster,omitempty"` // The configuration for the clustering.
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func main() {

// Register sub-commands
app.Command("load", "Runs the load testing client for emitter.", load.Run)
app.Command("license", "Manipulates licenses and secret keys.", func(config *cli.Cmd) {
config.Command("new", "Generates a new license and secret key pair.", license.New)
app.Command("license", "Manipulates licenses and secret keys.", func(cmd *cli.Cmd) {
cmd.Command("new", "Generates a new license and secret key pair.", license.New)
// TODO: add more sub-commands for license
})

Expand Down

0 comments on commit 1f69f19

Please sign in to comment.