Skip to content

Commit

Permalink
Add waiting time for discovery bundle to be OK.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pushpalanka committed Jun 28, 2024
1 parent 62f72d1 commit 73bdf5a
Showing 1 changed file with 44 additions and 20 deletions.
64 changes: 44 additions & 20 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
defaultReuseDuration = 30 * time.Second
defaultShutdownGracePeriod = 30 * time.Second
DefaultOpaStartupTimeout = 30 * time.Second
DefaultWaitToDiscover = 5 * time.Second

DefaultMaxRequestBodySize = 1 << 20 // 1 MB
DefaultMaxMemoryBodyParsing = 100 * DefaultMaxRequestBodySize
Expand Down Expand Up @@ -484,32 +485,36 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura
}

done := make(chan struct{})
failed := make(chan error) //ToDO ensure these channels are closed.
failed := make(chan error, 1)

// Create a context with a timeout for discovery plugin readiness
discoveryCtx, discoveryCancel := context.WithTimeout(ctx, DefaultWaitToDiscover)
defer discoveryCancel()

go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
discoveryStatus := opa.manager.PluginStatus()["discovery"]
if discoveryStatus != nil && discoveryStatus.State == plugins.StateOK {
registerBundlePluginListener(opa, failed, done)
return
}
case <-discoveryCtx.Done():
failed <- fmt.Errorf("discovery plugin did not reach OK state within timeout")
return
}
}
}()

discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...))
return
}

discoveryStatus := opa.manager.PluginStatus()["discovery"]
if discoveryStatus != nil && discoveryStatus.State == plugins.StateOK {
bundlePlugin := bundle.Lookup(opa.manager)
if bundlePlugin == nil {
failed <- fmt.Errorf("bundle plugin not found")
return
}

bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...))
return
}

close(done)
})
defer bundlePlugin.Unregister("startuplistener")
}
})

opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) {
Expand Down Expand Up @@ -553,6 +558,25 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura
}
}

func registerBundlePluginListener(opa *OpenPolicyAgentInstance, failed chan error, done chan struct{}) {
bundlePlugin := bundle.Lookup(opa.manager)
if bundlePlugin == nil {
failed <- fmt.Errorf("bundle plugin not found")
return
}

bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...))
return
}

close(done)
})
defer bundlePlugin.Unregister("startuplistener")
return

Check failure on line 577 in filters/openpolicyagent/openpolicyagent.go

View workflow job for this annotation

GitHub Actions / tests

redundant return statement (S1023)
}

func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
opa.once.Do(func() {
opa.manager.Stop(ctx)
Expand Down

0 comments on commit 73bdf5a

Please sign in to comment.