Skip to content

Commit

Permalink
Attempt to handle errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pushpalanka committed Jun 26, 2024
1 parent 2678aa1 commit a213cae
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
37 changes: 29 additions & 8 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,21 +480,43 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura
discoveryPlugin := discovery.Lookup(opa.manager)
bundlePlugin := bundle.Lookup(opa.manager)

if discoveryPlugin == nil {
return errors.New("discovery plugin not found")
}
if bundlePlugin == nil {
return errors.New("bundle plugin not found")
}

done := make(chan struct{})
defer close(done)
failed := make(chan error)
defer close(failed)

discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...))
select {
case failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...)):
default:
}
return
}
})

bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...))
}
// Add listener for bundle plugin on discovery plugin OK, because bundlePlugin is dependent on the configuration
// discovered using the discovery configs
bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
select {
case failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)):
default:
}
return
}

select {
case done <- struct{}{}:
default:
}
})
})
defer bundlePlugin.Unregister("startuplistener")

Expand All @@ -518,7 +540,6 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura
return nil
case err := <-failed:
opa.Close(ctx)

return err
case <-ctx.Done():
for pluginName, status := range opa.manager.PluginStatus() {
Expand All @@ -531,7 +552,7 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura
}
}
opa.Close(ctx)
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err)
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, ctx.Err())
}
}

Expand Down
4 changes: 2 additions & 2 deletions filters/openpolicyagent/openpolicyagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestRegistry(t *testing.T) {
assert.Error(t, err, "should not work after close")
}

func TestOpaEngineStartFailureWithTimeout(t *testing.T) {
func TestOpaEngineStartFailureWithTimeoutForBundlePlugin(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle")

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
Expand All @@ -258,7 +258,7 @@ func TestOpaEngineStartFailureWithTimeout(t *testing.T) {

err = engine.Start(ctx, cfg.startupTimeout)
assert.True(t, engine.stopped)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s")
assert.Contains(t, err.Error(), "bundle plugin not found")
}

func TestOpaActivationSuccessWithDiscovery(t *testing.T) {
Expand Down

0 comments on commit a213cae

Please sign in to comment.