diff --git a/cmd/resource.go b/cmd/resource.go index a3c4213ee..6b6318541 100644 --- a/cmd/resource.go +++ b/cmd/resource.go @@ -83,7 +83,7 @@ func configureResource(operation string, cmd *cobra.Command, conf *config.Config }{ { "svc", - &conf.Resource.Services, + &conf.Resource.Service, }, { "deploy", @@ -161,7 +161,7 @@ func init() { resourceConfigRemoveCmd, ) // Add resource object flags as PersistentFlags to resourceConfigCmd - resourceConfigCmd.PersistentFlags().Bool("svc", false, "watch for services") + resourceConfigCmd.PersistentFlags().Bool("svc", false, "watch for Service") resourceConfigCmd.PersistentFlags().Bool("deploy", false, "watch for deployments") resourceConfigCmd.PersistentFlags().Bool("po", false, "watch for pods") resourceConfigCmd.PersistentFlags().Bool("rc", false, "watch for replication controllers") diff --git a/cmd/root.go b/cmd/root.go index f9fe4d7f8..af29b0d92 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -54,6 +54,7 @@ supported webhooks: logrus.Fatal(err) } config.CheckMissingResourceEnvvars() + config.UnmarshallConfig() c.Run(config) }, } diff --git a/config/config.go b/config/config.go index 37afac157..cfd2e5c40 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ import ( "path/filepath" "runtime" + "github.com/Sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -44,7 +45,7 @@ type Resource struct { ReplicationController bool `json:"rc"` ReplicaSet bool `json:"rs"` DaemonSet bool `json:"ds"` - Services bool `json:"svc"` + Service bool `json:"svc"` Pod bool `json:"po"` Job bool `json:"job"` PersistentVolume bool `json:"pv"` @@ -54,13 +55,22 @@ type Resource struct { Ingress bool `json:"ing"` } +// Event struct for granular config +type Event struct { + Global []string `json:"string,omitempty"` + Create []string `json:"create,omitempty"` + Update []string `json:"update,omitempty"` + Delete []string `json:"delete,omitempty"` +} + // Config struct contains kubewatch configuration type Config struct { Handler Handler `json:"handler"` //Reason []string `json:"reason"` - Resource Resource `json:"resource"` + Resource Resource `json:"resource,omitempty"` // for watching specific namespace, leave it empty for watching all. // this config is ignored when watching namespaces + Event Event `json:"event,omitempty"` Namespace string `json:"namespace,omitempty"` } @@ -153,6 +163,7 @@ func (c *Config) Load() error { // CheckMissingResourceEnvvars will read the environment for equivalent config variables to set func (c *Config) CheckMissingResourceEnvvars() { + if !c.Resource.DaemonSet && os.Getenv("KW_DAEMONSET") == "true" { c.Resource.DaemonSet = true } @@ -171,8 +182,8 @@ func (c *Config) CheckMissingResourceEnvvars() { if !c.Resource.ReplicationController && os.Getenv("KW_REPLICATION_CONTROLLER") == "true" { c.Resource.ReplicationController = true } - if !c.Resource.Services && os.Getenv("KW_SERVICE") == "true" { - c.Resource.Services = true + if !c.Resource.Service && os.Getenv("KW_SERVICE") == "true" { + c.Resource.Service = true } if !c.Resource.Job && os.Getenv("KW_JOB") == "true" { c.Resource.Job = true @@ -197,6 +208,112 @@ func (c *Config) CheckMissingResourceEnvvars() { } } +func (c *Config) UnmarshallConfig() { + + // Resource Object Config add events under global scope + if c.Resource != (Resource{}) { + logrus.Info("Configuring Resources For Global Events") + if c.Resource.DaemonSet { + c.Event.Global = append(c.Event.Global, "demonset") + } + if c.Resource.ReplicaSet { + c.Event.Global = append(c.Event.Global, "replicaset") + } + if c.Resource.Namespace { + c.Event.Global = append(c.Event.Global, "namespace") + } + if c.Resource.Deployment { + c.Event.Global = append(c.Event.Global, "deployment") + } + if c.Resource.Pod { + c.Event.Global = append(c.Event.Global, "pod") + } + if c.Resource.ReplicationController { + c.Event.Global = append(c.Event.Global, "replicationcontroller") + } + if c.Resource.Service { + c.Event.Global = append(c.Event.Global, "service") + } + if c.Resource.Job { + c.Event.Global = append(c.Event.Global, "job") + } + if c.Resource.PersistentVolume { + c.Event.Global = append(c.Event.Global, "persistentvolume") + } + if c.Resource.Secret { + c.Event.Global = append(c.Event.Global, "secret") + } + if c.Resource.ConfigMap { + c.Event.Global = append(c.Event.Global, "configmap") + } + if c.Resource.Ingress { + c.Event.Global = append(c.Event.Global, "ingress") + } + } else { + // Configured using Events Config + logrus.Info("Configuring Resources Based on Events Config") + c.configureEvents(c.Event.Global) + c.configureEvents(c.Event.Create) + c.configureEvents(c.Event.Update) + c.configureEvents(c.Event.Delete) + } +} + +func (c *Config) configureEvents(s []string) { + for i := 0; i < len(s); i++ { + switch s[i] { + case "deployment": + { + c.Resource.Deployment = true + } + case "replicationcontroller": + { + c.Resource.ReplicationController = true + } + case "replicaset": + { + c.Resource.ReplicaSet = true + } + case "daemonset": + { + c.Resource.DaemonSet = true + } + case "service": + { + c.Resource.Service = true + } + case "pod": + { + c.Resource.Pod = true + } + case "job": + { + c.Resource.Job = true + } + case "persistentvolume": + { + c.Resource.PersistentVolume = true + } + case "namespace": + { + c.Resource.Namespace = true + } + case "secret": + { + c.Resource.Secret = true + } + case "configmap": + { + c.Resource.ConfigMap = true + } + case "ingress": + { + c.Resource.Ingress = true + } + } + } +} + func (c *Config) Write() error { b, err := yaml.Marshal(c) if err != nil { diff --git a/config/config_test.go b/config/config_test.go index f18bae3c6..6b26ae6f2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -39,7 +39,7 @@ var configStr = ` "replicationcontroller": "false", "replicaset": "false", "daemonset": "false", - "services": "false", + "Service": "false", "pod": "false", "secret": "true", "configmap": "true", diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 819018c1e..018e4e90b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "os/signal" + "strings" "syscall" "time" @@ -48,6 +49,12 @@ const maxRetries = 5 var serverStartTime time.Time +// Maps for holding events config +var global map[string]uint8 +var create map[string]uint8 +var delete map[string]uint8 +var update map[string]uint8 + // Event indicate the informerEvent type Event struct { key string @@ -67,6 +74,10 @@ type Controller struct { // Start prepares watchers and run their controllers, then waits for process termination signals func Start(conf *config.Config, eventHandler handlers.Handler) { + + // loads events config into memory for granular alerting + loadEventConfig(conf) + var kubeClient kubernetes.Interface _, err := rest.InClusterConfig() if err != nil { @@ -140,7 +151,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { go c.Run(stopCh) } - if conf.Resource.Services { + if conf.Resource.Service { informer := cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { @@ -462,13 +473,22 @@ func (c *Controller) processItem(newEvent Event) error { // get object's metedata objectMeta := utils.GetObjectMetaData(obj) + // namespace retrived from event key incase namespace value is empty + if newEvent.namespace == "" { + newEvent.namespace = strings.Split(newEvent.key, "/")[0] + } + // process events based on its type switch newEvent.eventType { case "create": // compare CreationTimestamp and serverStartTime and alert only on latest events // Could be Replaced by using Delta or DeltaFIFO if objectMeta.CreationTimestamp.Sub(serverStartTime).Seconds() > 0 { - c.eventHandler.ObjectCreated(obj) + if _, ok := global[newEvent.resourceType]; ok { + c.eventHandler.ObjectCreated(obj) + } else if _, ok := create[newEvent.resourceType]; ok { + c.eventHandler.ObjectCreated(obj) + } return nil } case "update": @@ -476,10 +496,15 @@ func (c *Controller) processItem(newEvent Event) error { - enahace update event processing in such a way that, it send alerts about what got changed. */ kbEvent := event.Event{ - Kind: newEvent.resourceType, - Name: newEvent.key, + Kind: newEvent.resourceType, + Name: newEvent.key, + Namespace: newEvent.namespace, + } + if _, ok := global[newEvent.resourceType]; ok { + c.eventHandler.ObjectUpdated(obj, kbEvent) + } else if _, ok := update[newEvent.resourceType]; ok { + c.eventHandler.ObjectUpdated(obj, kbEvent) } - c.eventHandler.ObjectUpdated(obj, kbEvent) return nil case "delete": kbEvent := event.Event{ @@ -487,8 +512,48 @@ func (c *Controller) processItem(newEvent Event) error { Name: newEvent.key, Namespace: newEvent.namespace, } - c.eventHandler.ObjectDeleted(kbEvent) + if _, ok := global[newEvent.resourceType]; ok { + c.eventHandler.ObjectDeleted(kbEvent) + } else if _, ok := delete[newEvent.resourceType]; ok { + c.eventHandler.ObjectDeleted(kbEvent) + } return nil } return nil } + +// loadEventConfig loads event list from Event config for granular alerting +func loadEventConfig(c *config.Config) { + + // Load Global events + if len(c.Event.Global) > 0 { + global = make(map[string]uint8) + for _, r := range c.Event.Global { + global[r] = 0 + } + } + + // Load Create events + if len(c.Event.Create) > 0 { + create = make(map[string]uint8) + for _, r := range c.Event.Create { + create[r] = 0 + } + } + + // Load Update events + if len(c.Event.Update) > 0 { + update = make(map[string]uint8) + for _, r := range c.Event.Update { + update[r] = 0 + } + } + + // Load Delete events + if len(c.Event.Delete) > 0 { + delete = make(map[string]uint8) + for _, r := range c.Event.Delete { + delete[r] = 0 + } + } +}