From 496379f2f7ba3d4b62001b146503b63b9b2eda37 Mon Sep 17 00:00:00 2001 From: devhindo Date: Mon, 6 Jan 2025 13:26:50 +0200 Subject: [PATCH 1/4] Add output mode flag and YAML file output option in Makefile Signed-off-by: devhindo --- Makefile | 6 ++++++ internal/config/config.go | 2 +- internal/config/types.go | 15 ++++++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 248d2e77..df9dbeec 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,12 @@ run: nats go$(v) mod tidy; \ DEBUG=true GOPROXY=direct GOSUMDB=off go run main.go +.PHONY: file +## output to a YAML file +file: + go mod tidy; \ + DEBUG=true GOPROXY=direct GOSUMDB=off go run . -output file + .PHONY: check ## Lint check Meshsync. check: diff --git a/internal/config/config.go b/internal/config/config.go index b53a5cfe..06e054b4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,7 +13,7 @@ func New(provider string) (config.Handler, error) { err error ) opts := configprovider.Options{ - FilePath: utils.GetHome(), + FilePath: utils.GetHome() + "/.meshery", FileType: "yaml", FileName: "meshsync_config", } diff --git a/internal/config/types.go b/internal/config/types.go index 0b4d3a9a..41bc178b 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -1,6 +1,10 @@ package config -import "golang.org/x/exp/slices" +import ( + "flag" + + "golang.org/x/exp/slices" +) const ( ServerKey = "server-config" @@ -17,6 +21,15 @@ const ( InformerStore = "informer-store" ) +// Command line flag to determine the output mode +var ( + OutputMode string +) + +func init() { + flag.StringVar(&OutputMode, "output", "nats", "Output mode: 'file' or 'nats'") + flag.Parse() +} type PipelineConfigs []PipelineConfig func (p PipelineConfigs) Add(pc PipelineConfig) PipelineConfigs { From 618f98024bb2ba2feee7e1055891e7ffeacbdd9c Mon Sep 17 00:00:00 2001 From: devhindo Date: Mon, 6 Jan 2025 13:40:11 +0200 Subject: [PATCH 2/4] Add output mode handling for connectivity test in main function Signed-off-by: devhindo --- main.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 3d82c954..887ac29b 100644 --- a/main.go +++ b/main.go @@ -99,7 +99,17 @@ func main() { os.Exit(1) } //Skip/Comment the below connectivity test in local environment - connectivityTest(cfg.GetKey(config.BrokerURL), log) + + // check if output mode is nats + if config.OutputMode == "nats" { + connectivityTest(cfg.GetKey(config.BrokerURL), log) + } + + if config.OutputMode == "file" { + log.Info("Output mode is file") + return + } + // Initialize Broker instance br, err := nats.New(nats.Options{ URLS: []string{cfg.GetKey(config.BrokerURL)}, From 0aec3cfe6e32196b12e3d4f776843d8af1dae2cf Mon Sep 17 00:00:00 2001 From: devhindo Date: Mon, 6 Jan 2025 14:55:42 +0200 Subject: [PATCH 3/4] Implement file output mode for data handling and update logging accordingly Signed-off-by: devhindo --- internal/config/types.go | 1 - internal/pipeline/handlers.go | 2 +- main.go | 5 --- meshsync/handlers.go | 70 ++++++++++++++++++++++++++++------- 4 files changed, 58 insertions(+), 20 deletions(-) diff --git a/internal/config/types.go b/internal/config/types.go index 41bc178b..afb9bc13 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -28,7 +28,6 @@ var ( func init() { flag.StringVar(&OutputMode, "output", "nats", "Output mode: 'file' or 'nats'") - flag.Parse() } type PipelineConfigs []PipelineConfig diff --git a/internal/pipeline/handlers.go b/internal/pipeline/handlers.go index 934ca707..037a9025 100644 --- a/internal/pipeline/handlers.go +++ b/internal/pipeline/handlers.go @@ -19,7 +19,7 @@ func (ri *RegisterInformer) GetEventHandlers() cache.ResourceEventHandlerFuncs { if err != nil { ri.log.Error(err) } - ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) + // ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) }, UpdateFunc: func(oldObj, obj interface{}) { oldObjCasted := oldObj.(*unstructured.Unstructured) diff --git a/main.go b/main.go index 887ac29b..731dbbba 100644 --- a/main.go +++ b/main.go @@ -105,11 +105,6 @@ func main() { connectivityTest(cfg.GetKey(config.BrokerURL), log) } - if config.OutputMode == "file" { - log.Info("Output mode is file") - return - } - // Initialize Broker instance br, err := nats.New(nats.Options{ URLS: []string{cfg.GetKey(config.BrokerURL)}, diff --git a/meshsync/handlers.go b/meshsync/handlers.go index fff3f30c..bd4df260 100644 --- a/meshsync/handlers.go +++ b/meshsync/handlers.go @@ -1,15 +1,18 @@ package meshsync import ( + "encoding/json" "fmt" + "path/filepath" "time" - "encoding/json" + "os" + "flag" "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/utils" "github.com/layer5io/meshkit/utils/kubernetes" "github.com/layer5io/meshsync/internal/channels" - "github.com/layer5io/meshsync/internal/config" + config "github.com/layer5io/meshsync/internal/config" "github.com/layer5io/meshsync/pkg/model" "golang.org/x/net/context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -69,6 +72,11 @@ func (h *Handler) UpdateInformer() error { } func (h *Handler) ListenToRequests() { + + // Parse the command=line flags to get the output mode + flag.Parse() + + listenerConfigs := make(map[string]config.ListenerConfig, 10) err := h.Config.GetObject(config.ListenersKey, &listenerConfigs) if err != nil { @@ -115,16 +123,27 @@ func (h *Handler) ListenToRequests() { storeObjects := h.listStoreObjects() splitSlices := splitIntoMultipleSlices(storeObjects, 5) // performance of NATS is bound to degrade if huge messages are sent - h.Log.Info("Publishing the data from informer stores to the subject: ", replySubject) - for _, val := range splitSlices { - err = h.Broker.Publish(replySubject, &broker.Message{ - Object: val, + if config.OutputMode == "file" { + h.Log.Info("Writing the data from informer stores to the file") + for _, val := range splitSlices { + err = writeToFile(replySubject, val) + if err != nil { + h.Log.Error(err) + continue + } + } + } else { + h.Log.Info("Publishing the data from informer stores to the subject: ", replySubject) + for _, val := range splitSlices { + err = h.Broker.Publish(replySubject, &broker.Message{ + Object: val, }) if err != nil { h.Log.Error(err) continue } } + } case broker.ReSyncDiscoveryEntity: h.Log.Info("Resyncing") @@ -144,13 +163,23 @@ func (h *Handler) ListenToRequests() { continue } case "meshsync-meta": - h.Log.Info("Publishing MeshSync metadata to the subject") - err := h.Broker.Publish("meshsync-meta", &broker.Message{ - Object: config.Server["version"], - }) - if err != nil { - h.Log.Error(err) - continue + if config.OutputMode == "file" { + h.Log.Info("Writing MeshSync metadata to the file") + err = writeToFile("meshsync-meta", config.Server["version"]) + if err != nil { + h.Log.Error(err) + continue + } + } else { + h.Log.Info("Publishing MeshSync metadata to the subject") + + err := h.Broker.Publish("meshsync-meta", &broker.Message{ + Object: config.Server["version"], + }) + if err != nil { + h.Log.Error(err) + continue + } } } } @@ -262,3 +291,18 @@ func splitIntoMultipleSlices(s []model.KubernetesResource, maxItmsPerSlice int) return result } + +// writing to a file +func writeToFile(filename string, data interface{}) error { + filePath := filepath.Join("data", filename+".json") + file, err := os.Create(filePath) + if err != nil { + return err + } + + defer file.Close() + + enc := json.NewEncoder(file) + enc.SetIndent("", " ") + return enc.Encode(data) +} \ No newline at end of file From c38abce282fb3fcebbcd0adc77d7ff61f387dee0 Mon Sep 17 00:00:00 2001 From: devhindo Date: Tue, 7 Jan 2025 15:39:02 +0200 Subject: [PATCH 4/4] Add command-line flag parsing for output mode in main function Signed-off-by: devhindo --- internal/config/types.go | 3 ++ main.go | 77 ++++++++++++++++++++++------------------ meshsync/handlers.go | 3 -- 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/internal/config/types.go b/internal/config/types.go index afb9bc13..989e5553 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -28,6 +28,9 @@ var ( func init() { flag.StringVar(&OutputMode, "output", "nats", "Output mode: 'file' or 'nats'") + + // Parse the command=line flags to get the output mode + flag.Parse() } type PipelineConfigs []PipelineConfig diff --git a/main.go b/main.go index 731dbbba..8667996a 100644 --- a/main.go +++ b/main.go @@ -102,45 +102,52 @@ func main() { // check if output mode is nats if config.OutputMode == "nats" { - connectivityTest(cfg.GetKey(config.BrokerURL), log) - } - // Initialize Broker instance - br, err := nats.New(nats.Options{ - URLS: []string{cfg.GetKey(config.BrokerURL)}, - ConnectionName: "meshsync", - Username: "", - Password: "", - ReconnectWait: 2 * time.Second, - MaxReconnect: 60, - }) - if err != nil { - log.Error(err) - os.Exit(1) - } + log.Info("NATS output mode selected") - chPool := channels.NewChannelPool() - meshsyncHandler, err := meshsync.New(cfg, log, br, chPool) - if err != nil { - log.Error(err) - os.Exit(1) + connectivityTest(cfg.GetKey(config.BrokerURL), log) + + // Initialize Broker instance + br, err := nats.New(nats.Options{ + URLS: []string{cfg.GetKey(config.BrokerURL)}, + ConnectionName: "meshsync", + Username: "", + Password: "", + ReconnectWait: 2 * time.Second, + MaxReconnect: 60, + }) + if err != nil { + log.Error(err) + os.Exit(1) + } + + chPool := channels.NewChannelPool() + meshsyncHandler, err := meshsync.New(cfg, log, br, chPool) + if err != nil { + log.Error(err) + os.Exit(1) + } + + go meshsyncHandler.WatchCRDs() + + go meshsyncHandler.Run() + go meshsyncHandler.ListenToRequests() + + log.Info("Server started") + // Handle graceful shutdown + signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt) + select { + case <-chPool[channels.OS].(channels.OSChannel): + close(chPool[channels.Stop].(channels.StopChannel)) + log.Info("Shutting down") + case <-chPool[channels.Stop].(channels.StopChannel): + close(chPool[channels.Stop].(channels.StopChannel)) + log.Info("Shutting down") + } } - go meshsyncHandler.WatchCRDs() - - go meshsyncHandler.Run() - go meshsyncHandler.ListenToRequests() - - log.Info("Server started") - // Handle graceful shutdown - signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt) - select { - case <-chPool[channels.OS].(channels.OSChannel): - close(chPool[channels.Stop].(channels.StopChannel)) - log.Info("Shutting down") - case <-chPool[channels.Stop].(channels.StopChannel): - close(chPool[channels.Stop].(channels.StopChannel)) - log.Info("Shutting down") + if config.OutputMode == "file" { + log.Info("File output mode is not implemented yet") } } diff --git a/meshsync/handlers.go b/meshsync/handlers.go index bd4df260..3f1d81c9 100644 --- a/meshsync/handlers.go +++ b/meshsync/handlers.go @@ -6,7 +6,6 @@ import ( "path/filepath" "time" "os" - "flag" "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/utils" @@ -73,8 +72,6 @@ func (h *Handler) UpdateInformer() error { func (h *Handler) ListenToRequests() { - // Parse the command=line flags to get the output mode - flag.Parse() listenerConfigs := make(map[string]config.ListenerConfig, 10)