Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pushpalanka committed Jun 28, 2024
1 parent a213cae commit 62f72d1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 32 deletions.
55 changes: 26 additions & 29 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,54 +478,51 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []
// policies, report status, etc.
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error {
discoveryPlugin := discovery.Lookup(opa.manager)
bundlePlugin := bundle.Lookup(opa.manager)

if discoveryPlugin == nil {
return errors.New("discovery plugin not found")
}
if bundlePlugin == nil {
return errors.New("bundle plugin not found")
}

done := make(chan struct{})
defer close(done)
failed := make(chan error)
defer close(failed)
failed := make(chan error) //ToDO ensure these channels are closed.

discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
select {
case failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...)):
default:
}
failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...))
return
}

// Add listener for bundle plugin on discovery plugin OK, because bundlePlugin is dependent on the configuration
// discovered using the discovery configs
bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
select {
case failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)):
default:
}
discoveryStatus := opa.manager.PluginStatus()["discovery"]
if discoveryStatus != nil && discoveryStatus.State == plugins.StateOK {
bundlePlugin := bundle.Lookup(opa.manager)
if bundlePlugin == nil {
failed <- fmt.Errorf("bundle plugin not found")
return
}

select {
case done <- struct{}{}:
default:
}
})
bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...))
return
}

close(done)
})
defer bundlePlugin.Unregister("startuplistener")
}
})
defer bundlePlugin.Unregister("startuplistener")

opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) {
for _, pluginstatus := range status {
if pluginstatus != nil && pluginstatus.State != plugins.StateOK {
return
}
discoveryStatus, ok := status["discovery"]
if !ok || discoveryStatus.State != plugins.StateOK {
return
}

bundleStatus, ok := status["bundle"]
if !ok || bundleStatus.State != plugins.StateOK {
return
}

close(done)
})
defer opa.manager.UnregisterPluginStatusListener("startuplistener")
Expand Down
6 changes: 3 additions & 3 deletions filters/openpolicyagent/openpolicyagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestOpaEngineStartFailureWithTimeoutForBundlePlugin(t *testing.T) {

err = engine.Start(ctx, cfg.startupTimeout)
assert.True(t, engine.stopped)
assert.Contains(t, err.Error(), "bundle plugin not found")
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s")
}

func TestOpaActivationSuccessWithDiscovery(t *testing.T) {
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) {

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded")
assert.Equal(t, 0, len(registry.instances))
}

Expand All @@ -353,7 +353,7 @@ func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) {

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded")
assert.Equal(t, 0, len(registry.instances))
}

Expand Down

0 comments on commit 62f72d1

Please sign in to comment.