Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPA: Fail fast on discovery or bundle download errors #3120

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
163 changes: 118 additions & 45 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/plugins/bundle"
"github.com/open-policy-agent/opa/plugins/discovery"
"github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/runtime"
Expand Down Expand Up @@ -50,6 +51,11 @@ const (
DefaultRequestBodyBufferSize = 8 * 1024 // 8 KB

spanNameEval = "open-policy-agent"

GeneralPluginStatusStartupListener = "general-plugin-status-startup"
DiscoveryPluginStartupListener = "skipper-instance-startup-discovery"
PluginStatusStartupListener = "skipper-instance-startup-plugin"
BundlePluginStartupListener = "skipper-instance-startup-bundle"
)

type OpenPolicyAgentRegistry struct {
Expand Down Expand Up @@ -363,16 +369,18 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s
}

type OpenPolicyAgentInstance struct {
manager *plugins.Manager
instanceConfig OpenPolicyAgentInstanceConfig
opaConfig *config.Config
bundleName string
preparedQuery *rego.PreparedEvalQuery
preparedQueryDoOnce *sync.Once
interQueryBuiltinCache iCache.InterQueryCache
once sync.Once
stopped bool
registry *OpenPolicyAgentRegistry
manager *plugins.Manager
instanceConfig OpenPolicyAgentInstanceConfig
opaConfig *config.Config
bundleName string
preparedQuery *rego.PreparedEvalQuery
preparedQueryDoOnce *sync.Once
interQueryBuiltinCache iCache.InterQueryCache
once sync.Once
stopped bool
registry *OpenPolicyAgentRegistry
registerBundleListenerOnce *sync.Once
registerDiscoveryListenerOnce *sync.Once

maxBodyBytes int64
bodyReadBufferSize int64
Expand Down Expand Up @@ -469,7 +477,9 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []
preparedQueryDoOnce: new(sync.Once),
interQueryBuiltinCache: iCache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()),

idGenerator: uniqueIDGenerator,
idGenerator: uniqueIDGenerator,
registerDiscoveryListenerOnce: new(sync.Once),
registerBundleListenerOnce: new(sync.Once),
}

manager.RegisterCompilerTrigger(opa.compilerUpdated)
Expand All @@ -480,38 +490,77 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []
// Start asynchronously starts the policy engine's plugins that download
// policies, report status, etc.
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error {
err := opa.manager.Start(ctx)

if err != nil {
return err
}
discoveryPlugin := discovery.Lookup(opa.manager)

// check readiness of all plugins
pluginsReady := func() bool {
for _, status := range opa.manager.PluginStatus() {
if status != nil && status.State != plugins.StateOK {
return false
done := make(chan struct{})
failed := make(chan error, 1)

opa.registerDiscoveryListenerOnce.Do(func() {

discoveryPlugin.RegisterListener(DiscoveryPluginStartupListener, func(status bundle.Status) {
handleStatusErrors(status, failed, "discovery plugin")
})
//defer discoveryPlugin.Unregister(DiscoveryPluginStartupListener) //ToDo
})

opa.manager.RegisterPluginStatusListener(PluginStatusStartupListener, func(status map[string]*plugins.Status) {
if _, exists := status["bundle"]; exists {
bundlePlugin := bundle.Lookup(opa.manager)
if bundlePlugin != nil {
opa.registerBundleListenerOnce.Do(func() {
bundlePlugin.Register(BundlePluginStartupListener, func(status bundle.Status) {
handleStatusErrors(status, failed, "bundle plugin")
//defer bundlePlugin.Unregister(BundlePluginStartupListener) //ToDo
})
})
}
}
return true
}
})
defer opa.manager.UnregisterPluginStatusListener(PluginStatusStartupListener)

err = waitFunc(ctx, pluginsReady, 100*time.Millisecond)
// Register listener for general plugin status checks
opa.manager.RegisterPluginStatusListener(GeneralPluginStatusStartupListener, func(status map[string]*plugins.Status) {
for _, pluginStatus := range status {
if pluginStatus != nil && pluginStatus.State != plugins.StateOK {
return
}
}
close(done)
})
defer opa.manager.UnregisterPluginStatusListener(GeneralPluginStatusStartupListener)

err := opa.manager.Start(ctx)
if err != nil {
return err
}

select {
case <-ctx.Done():
timeoutErr := ctx.Err()
Pushpalanka marked this conversation as resolved.
Show resolved Hide resolved

for pluginName, status := range opa.manager.PluginStatus() {
if status != nil && status.State != plugins.StateOK {
opa.Logger().WithFields(map[string]interface{}{
"plugin_name": pluginName,
"plugin_state": status.State,
"error_message": status.Message,
}).Error("Open policy agent plugin did not start in time")
}).Error("Open policy agent plugin: %v did not start in time", pluginName)
}
}
opa.Close(ctx)
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err)
if timeoutErr != nil {
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr)
}
return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout)

case <-done:
return nil
case err := <-failed:
opa.Close(ctx)

return err
}
return nil
}

func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
Expand All @@ -521,25 +570,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
})
}

func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error {
if fun() {
return nil
}
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out while starting: %w", ctx.Err())
case <-ticker.C:
if fun() {
return nil
}
}
}
}

func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) {
info := ast.NewObject()
labels := ast.NewObject()
Expand Down Expand Up @@ -796,3 +826,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) {
func (l *QuietLogger) Warn(fmt string, a ...interface{}) {
l.target.Warn(fmt, a)
}

var temporaryClientErrorHTTPCodes = map[int64]struct{}{
429: {}, // too many requests
408: {}, // request timeout
}

func isTemporaryError(code int64) bool {
_, exists := temporaryClientErrorHTTPCodes[code]
return exists
}

func handleStatusErrors(
status bundle.Status,
failed chan error,
prefix string,
) {
if status.Code == "bundle_error" {
Pushpalanka marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still feels like we are only handling a subset of errors, is this really reliable? My understanding was that the Code only gives information about what went wrong and that length(status.Errors) essentially gives yoj if anything goes wrong…

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That indeed is more intuitive, but had below concerns.

When looking at the OPA implementation below, status.Errors it is set only in 1 occasion out of the 3. But the other 2 occasions set status.Errors to nil even though it is setting an error. Based on the existing implementation status.Code is most reliable as I see.

I also understand if the bundle_error constant is ever modified then we are in trouble. But then again, just relying on code to be non-empty is also not good if the code got ever set in an non-errornous scenario.

Status Error code segment, /plugins/bundle/status.go#L64-L97
const (
	errCode = "bundle_error"
)

func (s *Status) SetError(err error) {
	var (
		astErrors ast.Errors
		httpError download.HTTPError
	)
	switch {
	case err == nil:
		s.Code = ""
		s.HTTPCode = ""
		s.Message = ""
		s.Errors = nil

	case errors.As(err, &astErrors):
		s.Code = errCode
		s.HTTPCode = ""
		s.Message = types.MsgCompileModuleError
		s.Errors = make([]error, len(astErrors))
		for i := range astErrors {
			s.Errors[i] = astErrors[i]
		}

	case errors.As(err, &httpError):
		s.Code = errCode
		s.HTTPCode = json.Number(strconv.Itoa(httpError.StatusCode))
		s.Message = err.Error()
		s.Errors = nil

	default:
		s.Code = errCode
		s.HTTPCode = ""
		s.Message = err.Error()
		s.Errors = nil
	}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like we want to reach out to styra to make a proper error handling possible.
I also wonder about the typing here. Status.HTTPCode being a string and httpError not being an error, what the hack is this code?

Copy link
Collaborator

@Pushpalanka Pushpalanka Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will raise this in the OPA community and get back.

PS: open-policy-agent/opa#6983 raised with minimum changes that will help us here. Will take it forward based on review.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went ahead with the bundle_error constant as a change there will be communicated in OPA release notes.

if status.HTTPCode == "" {
failed <- formatStatusError(prefix, status)
return
}
code, err := status.HTTPCode.Int64()
if err == nil {
if code >= 400 && code < 500 && !isTemporaryError(code) {
// Fail for error codes in the range 400-500 excluding temporary errors
failed <- formatStatusError(prefix, status)
return
} else if code >= 500 {
// Do not fail for 5xx errors and keep retrying
return
}
}
if err != nil {
failed <- formatStatusError(prefix, status)
return
}
}
}

func formatStatusError(prefix string, status bundle.Status) error {
return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v",
prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors)
}
Loading
Loading