diff --git a/Makefile b/Makefile index 248d2e77..df9dbeec 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,12 @@ run: nats go$(v) mod tidy; \ DEBUG=true GOPROXY=direct GOSUMDB=off go run main.go +.PHONY: file +## output to a YAML file +file: + go mod tidy; \ + DEBUG=true GOPROXY=direct GOSUMDB=off go run . -output file + .PHONY: check ## Lint check Meshsync. check: diff --git a/internal/config/config.go b/internal/config/config.go index b53a5cfe..06e054b4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,7 +13,7 @@ func New(provider string) (config.Handler, error) { err error ) opts := configprovider.Options{ - FilePath: utils.GetHome(), + FilePath: utils.GetHome() + "/.meshery", FileType: "yaml", FileName: "meshsync_config", } diff --git a/internal/config/types.go b/internal/config/types.go index 0b4d3a9a..989e5553 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -1,6 +1,10 @@ package config -import "golang.org/x/exp/slices" +import ( + "flag" + + "golang.org/x/exp/slices" +) const ( ServerKey = "server-config" @@ -17,6 +21,17 @@ const ( InformerStore = "informer-store" ) +// Command line flag to determine the output mode +var ( + OutputMode string +) + +func init() { + flag.StringVar(&OutputMode, "output", "nats", "Output mode: 'file' or 'nats'") + + // Parse the command=line flags to get the output mode + flag.Parse() +} type PipelineConfigs []PipelineConfig func (p PipelineConfigs) Add(pc PipelineConfig) PipelineConfigs { diff --git a/internal/pipeline/handlers.go b/internal/pipeline/handlers.go index 934ca707..037a9025 100644 --- a/internal/pipeline/handlers.go +++ b/internal/pipeline/handlers.go @@ -19,7 +19,7 @@ func (ri *RegisterInformer) GetEventHandlers() cache.ResourceEventHandlerFuncs { if err != nil { ri.log.Error(err) } - ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) + // ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) }, UpdateFunc: func(oldObj, obj interface{}) { oldObjCasted := oldObj.(*unstructured.Unstructured) diff --git a/main.go b/main.go index 3d82c954..8667996a 100644 --- a/main.go +++ b/main.go @@ -99,43 +99,55 @@ func main() { os.Exit(1) } //Skip/Comment the below connectivity test in local environment - connectivityTest(cfg.GetKey(config.BrokerURL), log) - // Initialize Broker instance - br, err := nats.New(nats.Options{ - URLS: []string{cfg.GetKey(config.BrokerURL)}, - ConnectionName: "meshsync", - Username: "", - Password: "", - ReconnectWait: 2 * time.Second, - MaxReconnect: 60, - }) - if err != nil { - log.Error(err) - os.Exit(1) - } - chPool := channels.NewChannelPool() - meshsyncHandler, err := meshsync.New(cfg, log, br, chPool) - if err != nil { - log.Error(err) - os.Exit(1) + // check if output mode is nats + if config.OutputMode == "nats" { + + log.Info("NATS output mode selected") + + connectivityTest(cfg.GetKey(config.BrokerURL), log) + + // Initialize Broker instance + br, err := nats.New(nats.Options{ + URLS: []string{cfg.GetKey(config.BrokerURL)}, + ConnectionName: "meshsync", + Username: "", + Password: "", + ReconnectWait: 2 * time.Second, + MaxReconnect: 60, + }) + if err != nil { + log.Error(err) + os.Exit(1) + } + + chPool := channels.NewChannelPool() + meshsyncHandler, err := meshsync.New(cfg, log, br, chPool) + if err != nil { + log.Error(err) + os.Exit(1) + } + + go meshsyncHandler.WatchCRDs() + + go meshsyncHandler.Run() + go meshsyncHandler.ListenToRequests() + + log.Info("Server started") + // Handle graceful shutdown + signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt) + select { + case <-chPool[channels.OS].(channels.OSChannel): + close(chPool[channels.Stop].(channels.StopChannel)) + log.Info("Shutting down") + case <-chPool[channels.Stop].(channels.StopChannel): + close(chPool[channels.Stop].(channels.StopChannel)) + log.Info("Shutting down") + } } - go meshsyncHandler.WatchCRDs() - - go meshsyncHandler.Run() - go meshsyncHandler.ListenToRequests() - - log.Info("Server started") - // Handle graceful shutdown - signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt) - select { - case <-chPool[channels.OS].(channels.OSChannel): - close(chPool[channels.Stop].(channels.StopChannel)) - log.Info("Shutting down") - case <-chPool[channels.Stop].(channels.StopChannel): - close(chPool[channels.Stop].(channels.StopChannel)) - log.Info("Shutting down") + if config.OutputMode == "file" { + log.Info("File output mode is not implemented yet") } } diff --git a/meshsync/handlers.go b/meshsync/handlers.go index fff3f30c..3f1d81c9 100644 --- a/meshsync/handlers.go +++ b/meshsync/handlers.go @@ -1,15 +1,17 @@ package meshsync import ( + "encoding/json" "fmt" + "path/filepath" "time" - "encoding/json" + "os" "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/utils" "github.com/layer5io/meshkit/utils/kubernetes" "github.com/layer5io/meshsync/internal/channels" - "github.com/layer5io/meshsync/internal/config" + config "github.com/layer5io/meshsync/internal/config" "github.com/layer5io/meshsync/pkg/model" "golang.org/x/net/context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -69,6 +71,9 @@ func (h *Handler) UpdateInformer() error { } func (h *Handler) ListenToRequests() { + + + listenerConfigs := make(map[string]config.ListenerConfig, 10) err := h.Config.GetObject(config.ListenersKey, &listenerConfigs) if err != nil { @@ -115,16 +120,27 @@ func (h *Handler) ListenToRequests() { storeObjects := h.listStoreObjects() splitSlices := splitIntoMultipleSlices(storeObjects, 5) // performance of NATS is bound to degrade if huge messages are sent - h.Log.Info("Publishing the data from informer stores to the subject: ", replySubject) - for _, val := range splitSlices { - err = h.Broker.Publish(replySubject, &broker.Message{ - Object: val, + if config.OutputMode == "file" { + h.Log.Info("Writing the data from informer stores to the file") + for _, val := range splitSlices { + err = writeToFile(replySubject, val) + if err != nil { + h.Log.Error(err) + continue + } + } + } else { + h.Log.Info("Publishing the data from informer stores to the subject: ", replySubject) + for _, val := range splitSlices { + err = h.Broker.Publish(replySubject, &broker.Message{ + Object: val, }) if err != nil { h.Log.Error(err) continue } } + } case broker.ReSyncDiscoveryEntity: h.Log.Info("Resyncing") @@ -144,13 +160,23 @@ func (h *Handler) ListenToRequests() { continue } case "meshsync-meta": - h.Log.Info("Publishing MeshSync metadata to the subject") - err := h.Broker.Publish("meshsync-meta", &broker.Message{ - Object: config.Server["version"], - }) - if err != nil { - h.Log.Error(err) - continue + if config.OutputMode == "file" { + h.Log.Info("Writing MeshSync metadata to the file") + err = writeToFile("meshsync-meta", config.Server["version"]) + if err != nil { + h.Log.Error(err) + continue + } + } else { + h.Log.Info("Publishing MeshSync metadata to the subject") + + err := h.Broker.Publish("meshsync-meta", &broker.Message{ + Object: config.Server["version"], + }) + if err != nil { + h.Log.Error(err) + continue + } } } } @@ -262,3 +288,18 @@ func splitIntoMultipleSlices(s []model.KubernetesResource, maxItmsPerSlice int) return result } + +// writing to a file +func writeToFile(filename string, data interface{}) error { + filePath := filepath.Join("data", filename+".json") + file, err := os.Create(filePath) + if err != nil { + return err + } + + defer file.Close() + + enc := json.NewEncoder(file) + enc.SetIndent("", " ") + return enc.Encode(data) +} \ No newline at end of file