From 73bdf5a6284ef1bad65a3ef0b9634cb059883fe3 Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Fri, 28 Jun 2024 17:44:12 +0200 Subject: [PATCH] Add waiting time for discovery bundle to be OK. --- filters/openpolicyagent/openpolicyagent.go | 64 +++++++++++++++------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 83a1046bb6..706c3bf138 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -44,6 +44,7 @@ const ( defaultReuseDuration = 30 * time.Second defaultShutdownGracePeriod = 30 * time.Second DefaultOpaStartupTimeout = 30 * time.Second + DefaultWaitToDiscover = 5 * time.Second DefaultMaxRequestBodySize = 1 << 20 // 1 MB DefaultMaxMemoryBodyParsing = 100 * DefaultMaxRequestBodySize @@ -484,32 +485,36 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } done := make(chan struct{}) - failed := make(chan error) //ToDO ensure these channels are closed. + failed := make(chan error, 1) + + // Create a context with a timeout for discovery plugin readiness + discoveryCtx, discoveryCancel := context.WithTimeout(ctx, DefaultWaitToDiscover) + defer discoveryCancel() + + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + discoveryStatus := opa.manager.PluginStatus()["discovery"] + if discoveryStatus != nil && discoveryStatus.State == plugins.StateOK { + registerBundlePluginListener(opa, failed, done) + return + } + case <-discoveryCtx.Done(): + failed <- fmt.Errorf("discovery plugin did not reach OK state within timeout") + return + } + } + }() discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) { if len(status.Errors) > 0 { failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...)) return } - - discoveryStatus := opa.manager.PluginStatus()["discovery"] - if discoveryStatus != nil && discoveryStatus.State == plugins.StateOK { - bundlePlugin := bundle.Lookup(opa.manager) - if bundlePlugin == nil { - failed <- fmt.Errorf("bundle plugin not found") - return - } - - bundlePlugin.Register("startuplistener", func(status bundle.Status) { - if len(status.Errors) > 0 { - failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)) - return - } - - close(done) - }) - defer bundlePlugin.Unregister("startuplistener") - } }) opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) { @@ -553,6 +558,25 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } } +func registerBundlePluginListener(opa *OpenPolicyAgentInstance, failed chan error, done chan struct{}) { + bundlePlugin := bundle.Lookup(opa.manager) + if bundlePlugin == nil { + failed <- fmt.Errorf("bundle plugin not found") + return + } + + bundlePlugin.Register("startuplistener", func(status bundle.Status) { + if len(status.Errors) > 0 { + failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)) + return + } + + close(done) + }) + defer bundlePlugin.Unregister("startuplistener") + return +} + func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { opa.once.Do(func() { opa.manager.Stop(ctx)