From 6449a0d6dcda086c8c4be67260ccda8c73c19a8a Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Sat, 1 Mar 2025 15:35:35 +0000 Subject: [PATCH 1/6] async execute of capability with parallel execution limit --- .../remote/executable/client_test.go | 2 + .../remote/executable/endtoend_test.go | 26 +++- .../executable/parallel_execution_limiter.go | 54 +++++++++ .../parallel_execution_limiter_test.go | 114 ++++++++++++++++++ core/capabilities/remote/executable/server.go | 22 +++- .../remote/executable/server_test.go | 54 ++++++++- 6 files changed, 265 insertions(+), 7 deletions(-) create mode 100644 core/capabilities/remote/executable/parallel_execution_limiter.go create mode 100644 core/capabilities/remote/executable/parallel_execution_limiter_test.go diff --git a/core/capabilities/remote/executable/client_test.go b/core/capabilities/remote/executable/client_test.go index 20458eb3531..0c24f804400 100644 --- a/core/capabilities/remote/executable/client_test.go +++ b/core/capabilities/remote/executable/client_test.go @@ -25,7 +25,9 @@ import ( const ( stepReferenceID1 = "step1" workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowID2 = "25c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce1" workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" + workflowExecutionID2 = "85ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeee" workflowOwnerID = "0xAA" ) diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 5df29b937d3..219f16b06a1 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -166,7 +166,7 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) capabilityNode := executable.NewServer(&commoncap.RemoteExecutableConfig{RequestHashExcludedAttributes: []string{}}, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout, lggr) + capabilityNodeResponseTimeout, 10, lggr) servicetest.Run(t, capabilityNode) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode @@ -336,6 +336,30 @@ func (t TestCapability) Execute(ctx context.Context, request commoncap.Capabilit }, nil } +type TestSlowExecutionCapability struct { + abstractTestCapability + workflowIDToPause map[string]time.Duration +} + +func (t *TestSlowExecutionCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) { + var delay time.Duration + if d, ok := t.workflowIDToPause[request.Metadata.WorkflowID]; ok { + delay = d + } else { + panic("workflowID not found") + } + + time.Sleep(delay) + + response, err := values.NewMap(map[string]any{"response": delay.String()}) + if err != nil { + return commoncap.CapabilityResponse{}, err + } + return commoncap.CapabilityResponse{ + Value: response, + }, nil +} + type TestErrorCapability struct { abstractTestCapability } diff --git a/core/capabilities/remote/executable/parallel_execution_limiter.go b/core/capabilities/remote/executable/parallel_execution_limiter.go new file mode 100644 index 00000000000..ff62ad6146e --- /dev/null +++ b/core/capabilities/remote/executable/parallel_execution_limiter.go @@ -0,0 +1,54 @@ +package executable + +import ( + "context" + "errors" + "sync" + + "golang.org/x/sync/semaphore" + + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +var ErrMaxParallelExecutionLimitReached = errors.New("parallel execution limit reached") + +type parallelExecutionLimiter struct { + maxParallelTasks int64 + sem *semaphore.Weighted + wg sync.WaitGroup + stopChan services.StopChan +} + +func newParallelExecutionLimiter(maxParallelTasks int64) *parallelExecutionLimiter { + return ¶llelExecutionLimiter{ + maxParallelTasks: maxParallelTasks, + sem: semaphore.NewWeighted(maxParallelTasks), + stopChan: make(services.StopChan), + } +} + +// ExecuteTask executes a task in parallel, returning ErrMaxParallelExecutionLimitReached if the maximum number of +// +// parallel executing tasks has been reached. The task is executed with the given context. +func (t *parallelExecutionLimiter) ExecuteTask(ctx context.Context, task func(ctx context.Context)) error { + if !t.sem.TryAcquire(1) { + return ErrMaxParallelExecutionLimitReached + } + + t.wg.Add(1) + go func() { + defer t.sem.Release(1) + defer t.wg.Done() + + ctxWithStop, cancel := t.stopChan.Ctx(ctx) + task(ctxWithStop) + cancel() + }() + + return nil +} + +func (t *parallelExecutionLimiter) Close() { + close(t.stopChan) + t.wg.Wait() +} diff --git a/core/capabilities/remote/executable/parallel_execution_limiter_test.go b/core/capabilities/remote/executable/parallel_execution_limiter_test.go new file mode 100644 index 00000000000..43c18447eed --- /dev/null +++ b/core/capabilities/remote/executable/parallel_execution_limiter_test.go @@ -0,0 +1,114 @@ +package executable + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_CancellingContext_StopsTask(t *testing.T) { + tp := newParallelExecutionLimiter(10) + + var cancelFns []context.CancelFunc + + var counter int32 + for i := 0; i < 10; i++ { + + ctx, cancel := context.WithCancel(context.Background()) + cancelFns = append(cancelFns, cancel) + err := tp.ExecuteTask(ctx, func(ctx context.Context) { + atomic.AddInt32(&counter, 1) + select { + case <-ctx.Done(): + } + atomic.AddInt32(&counter, -1) + }) + + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&counter) == 10 + }, 10*time.Second, 10*time.Millisecond) + + for _, cancel := range cancelFns { + cancel() + } + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&counter) == 0 + }, 10*time.Second, 10*time.Millisecond) + + tp.Close() +} + +func TestThreadPool_Full_Error(t *testing.T) { + tp := newParallelExecutionLimiter(3) + + for i := 0; i < 3; i++ { + err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { + select { + case <-ctx.Done(): + } + }) + require.NoError(t, err) + } + + err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { + }) + assert.Error(t, err) + assert.Equal(t, ErrMaxParallelExecutionLimitReached, err) + tp.Close() +} + +func TestThreadPool_ExecutingTasks(t *testing.T) { + tp := newParallelExecutionLimiter(10) + + var counter int32 + for i := 0; i < 10; i++ { + err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { + atomic.AddInt32(&counter, 1) + select { + case <-ctx.Done(): + } + atomic.AddInt32(&counter, -1) + }) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&counter) == 10 + }, 10*time.Second, 10*time.Millisecond) + +} + +func TestThreadPool_Stop(t *testing.T) { + tp := newParallelExecutionLimiter(10) + + var counter int32 + for i := 0; i < 10; i++ { + err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { + atomic.AddInt32(&counter, 1) + select { + case <-ctx.Done(): + } + atomic.AddInt32(&counter, -1) + }) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&counter) == 10 + }, 10*time.Second, 10*time.Millisecond) + + tp.Close() + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&counter) == 0 + }, 10*time.Second, 10*time.Millisecond) + +} diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index 4aeed012a0e..2b88b38815b 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -48,6 +48,8 @@ type server struct { receiveLock sync.Mutex stopCh services.StopChan wg sync.WaitGroup + + parallelExecutionLimiter *parallelExecutionLimiter } var _ types.Receiver = &server{} @@ -60,7 +62,9 @@ type requestAndMsgID struct { func NewServer(remoteExecutableConfig *commoncap.RemoteExecutableConfig, peerID p2ptypes.PeerID, underlying commoncap.ExecutableCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, - workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *server { + workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, + maxParallelRequests int64, + lggr logger.Logger) *server { if remoteExecutableConfig == nil { lggr.Info("no remote config provided, using default values") remoteExecutableConfig = &commoncap.RemoteExecutableConfig{} @@ -80,6 +84,8 @@ func NewServer(remoteExecutableConfig *commoncap.RemoteExecutableConfig, peerID lggr: lggr.Named("ExecutableCapabilityServer"), stopCh: make(services.StopChan), + + parallelExecutionLimiter: newParallelExecutionLimiter(maxParallelRequests), } } @@ -94,6 +100,7 @@ func (r *server) Start(ctx context.Context) error { } ticker := time.NewTicker(tickerInterval) defer ticker.Stop() + r.lggr.Info("executable capability server started") for { select { @@ -110,6 +117,8 @@ func (r *server) Start(ctx context.Context) error { func (r *server) Close() error { return r.StopOnce(r.Name(), func() error { + r.parallelExecutionLimiter.Close() + close(r.stopCh) r.wg.Wait() r.lggr.Info("executable capability server closed") @@ -191,10 +200,13 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { reqAndMsgID := r.requestIDToRequest[requestID] - err = reqAndMsgID.request.OnMessage(ctx, msg) - if err != nil { - r.lggr.Errorw("request failed to OnMessage new message", "messageID", reqAndMsgID.messageID, "err", err) - } + go func() { + + err = reqAndMsgID.request.OnMessage(ctx, msg) + if err != nil { + r.lggr.Errorw("request failed to OnMessage new message", "messageID", reqAndMsgID.messageID, "err", err) + } + }() } func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) { diff --git a/core/capabilities/remote/executable/server_test.go b/core/capabilities/remote/executable/server_test.go index 7d0b776f28e..83d104c54cc 100644 --- a/core/capabilities/remote/executable/server_test.go +++ b/core/capabilities/remote/executable/server_test.go @@ -20,6 +20,58 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +func Test_Server_Execute_SlowCapabilityExecutionDoesNotImpactSubsequentCall(t *testing.T) { + ctx := testutils.Context(t) + + numCapabilityPeers := 4 + + workflowIDToPause := map[string]time.Duration{} + workflowIDToPause[workflowID1] = 1 * time.Minute + workflowIDToPause[workflowID2] = 1 * time.Second + + callers, srvcs := testRemoteExecutableCapabilityServer(ctx, t, &commoncap.RemoteExecutableConfig{}, &TestSlowExecutionCapability{workflowIDToPause: workflowIDToPause}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) + + for _, caller := range callers { + _, err := caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, + }, + }) + require.NoError(t, err) + } + + for _, caller := range callers { + _, err := caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID2, + WorkflowExecutionID: workflowExecutionID2, + }, + }) + require.NoError(t, err) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_OK, msg.Error) + + capabilityResponse, err := pb.UnmarshalCapabilityResponse(msg.Payload) + require.NoError(t, err) + val := capabilityResponse.Value.Underlying["response"] + + var valAsStr string + val.UnwrapTo(&valAsStr) + + assert.Equal(t, "1s", valAsStr) + } + } + + closeServices(t, srvcs) +} + func Test_Server_DefaultExcludedAttributes(t *testing.T) { ctx := testutils.Context(t) @@ -226,7 +278,7 @@ func testRemoteExecutableCapabilityServer(ctx context.Context, t *testing.T, capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) capabilityNode := executable.NewServer(config, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout, lggr) + capabilityNodeResponseTimeout, 10, lggr) require.NoError(t, capabilityNode.Start(ctx)) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode From fcc4b173572d78bff9b551063cdee9c187019866 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Sat, 1 Mar 2025 15:38:36 +0000 Subject: [PATCH 2/6] async execute of capability with parallel execution limit --- core/capabilities/launcher.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index d788b688bc0..198f524a635 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -407,7 +407,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability var ( // TODO: make this configurable - defaultTargetRequestTimeout = 8 * time.Minute + defaultTargetRequestTimeout = 8 * time.Minute + defaultMaxParallelCapabilityExecuteRequests = int64(100) ) func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error { @@ -473,6 +474,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee idsToDONs, w.dispatcher, defaultTargetRequestTimeout, + defaultMaxParallelCapabilityExecuteRequests, w.lggr, ), nil } @@ -505,6 +507,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee idsToDONs, w.dispatcher, defaultTargetRequestTimeout, + defaultMaxParallelCapabilityExecuteRequests, w.lggr, ), nil } From 35ad2eace7d7c341efa6671c4829c183f0d0e9f5 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Sat, 1 Mar 2025 20:57:53 +0000 Subject: [PATCH 3/6] additional end to end tests --- .../remote/executable/endtoend_test.go | 157 ++++++++++++++++-- 1 file changed, 144 insertions(+), 13 deletions(-) diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 219f16b06a1..0aa9bfff2ae 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "errors" "sync" + "sync/atomic" "testing" "time" @@ -26,6 +27,134 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecution_AllAtOnce(t *testing.T) { + ctx := testutils.Context(t) + + workflowIDToPause := map[string]time.Duration{} + workflowIDToPause[workflowID1] = 1 * time.Minute + workflowIDToPause[workflowID2] = 1 * time.Second + capability := &TestSlowExecutionCapability{ + workflowIDToPause: workflowIDToPause, + } + + var callCount int64 + + numWorkflowPeers := int64(10) + var testShuttingDown int32 + + responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) { + shuttingDown := atomic.LoadInt32(&testShuttingDown) + if shuttingDown != 0 { + return + } + + if assert.NoError(t, responseError) { + mp, err := response.Value.Unwrap() + if assert.NoError(t, err) { + assert.Equal(t, "1s", mp.(map[string]any)["response"].(string)) + } + + atomic.AddInt64(&callCount, 1) + } + } + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + timeOut := 10 * time.Minute + + method := func(ctx context.Context, caller commoncap.ExecutableCapability) { + executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1) + } + testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10, + 9, timeOut, method, false) + + method = func(ctx context.Context, caller commoncap.ExecutableCapability) { + executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID2) + } + testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10, + 9, timeOut, method, false) + + require.Eventually(t, func() bool { + count := atomic.LoadInt64(&callCount) + + if count == numWorkflowPeers { + atomic.AddInt32(&testShuttingDown, 1) + return true + } + + return false + + }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s task") +} + +func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecution_OneAtATime(t *testing.T) { + ctx := testutils.Context(t) + + workflowIDToPause := map[string]time.Duration{} + workflowIDToPause[workflowID1] = 1 * time.Minute + workflowIDToPause[workflowID2] = 1 * time.Second + capability := &TestSlowExecutionCapability{ + workflowIDToPause: workflowIDToPause, + } + + var callCount int64 + + numWorkflowPeers := int64(10) + var testShuttingDown int32 + + responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) { + shuttingDown := atomic.LoadInt32(&testShuttingDown) + if shuttingDown != 0 { + return + } + + if assert.NoError(t, responseError) { + mp, err := response.Value.Unwrap() + if assert.NoError(t, err) { + assert.Equal(t, "1s", mp.(map[string]any)["response"].(string)) + } + + atomic.AddInt64(&callCount, 1) + } + } + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + timeOut := 10 * time.Minute + + method := func(ctx context.Context, caller commoncap.ExecutableCapability) { + executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1) + } + testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10, + 9, timeOut, method, false) + + method = func(ctx context.Context, caller commoncap.ExecutableCapability) { + executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID2) + } + testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10, + 9, timeOut, method, false) + + require.Eventually(t, func() bool { + count := atomic.LoadInt64(&callCount) + + if count == numWorkflowPeers { + atomic.AddInt32(&testShuttingDown, 1) + return true + } + + return false + + }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s task") +} + func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) { ctx := testutils.Context(t) @@ -49,9 +178,9 @@ func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) { capability := &TestCapability{} method := func(ctx context.Context, caller commoncap.ExecutableCapability) { - executeCapability(ctx, t, caller, transmissionSchedule, responseTest) + executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1) } - testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method) + testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method, true) transmissionSchedule, err = values.NewMap(map[string]any{ "schedule": transmission.Schedule_AllAtOnce, @@ -59,10 +188,10 @@ func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) { }) require.NoError(t, err) method = func(ctx context.Context, caller commoncap.ExecutableCapability) { - executeCapability(ctx, t, caller, transmissionSchedule, responseTest) + executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1) } - testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method) + testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method, true) } func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) { @@ -81,11 +210,11 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) { methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { assert.Equal(t, "error executing request: failed to execute capability", responseError.Error()) - }) + }, workflowID1) }) for _, method := range methods { - testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, method) + testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, method, true) } } @@ -105,18 +234,18 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) { methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { assert.Equal(t, "error executing request: failed to execute capability", responseError.Error()) - }) + }, workflowID1) }) for _, method := range methods { testRemoteExecutableCapability(ctx, t, capability, 10, 9, 1*time.Second, 10, 9, 10*time.Minute, - method) + method, true) } } func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlying commoncap.ExecutableCapability, numWorkflowPeers int, workflowDonF uint8, workflowNodeTimeout time.Duration, numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration, - method func(ctx context.Context, caller commoncap.ExecutableCapability)) { + method func(ctx context.Context, caller commoncap.ExecutableCapability), waitForExecuteCalls bool) { lggr := logger.TestLogger(t) capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) @@ -192,8 +321,9 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin method(ctx, caller) }(caller) } - - wg.Wait() + if waitForExecuteCalls { + wg.Wait() + } } type testAsyncMessageBroker struct { @@ -414,7 +544,8 @@ func libp2pMagic() []byte { return []byte{0x00, 0x24, 0x08, 0x01, 0x12, 0x20} } -func executeCapability(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, response commoncap.CapabilityResponse, responseError error)) { +func executeCapability(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, response commoncap.CapabilityResponse, responseError error), + workflowID string) { executeInputs, err := values.NewMap( map[string]any{ "executeValue1": "aValue1", @@ -424,7 +555,7 @@ func executeCapability(ctx context.Context, t *testing.T, caller commoncap.Execu response, err := caller.Execute(ctx, commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: workflowID1, + WorkflowID: workflowID, WorkflowExecutionID: workflowExecutionID1, }, Config: transmissionSchedule, From 49c1891e180a27d19e4db96fd5340cfc3a6049d4 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Sat, 1 Mar 2025 21:02:12 +0000 Subject: [PATCH 4/6] lint --- core/capabilities/remote/executable/endtoend_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 0aa9bfff2ae..b704b4ec398 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -88,7 +88,7 @@ func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecutio return false - }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s task") + }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s delay capability") } func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecution_OneAtATime(t *testing.T) { @@ -152,7 +152,7 @@ func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecutio return false - }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s task") + }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s delay capability") } func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) { From c42dba012120a6637d1539efd4534a444d16ca21 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Sun, 2 Mar 2025 10:23:50 -0500 Subject: [PATCH 5/6] lint --- .../executable/parallel_execution_limiter.go | 3 +-- .../parallel_execution_limiter_test.go | 23 +++++++------------ core/capabilities/remote/executable/server.go | 1 - .../remote/executable/server_test.go | 3 ++- 4 files changed, 11 insertions(+), 19 deletions(-) diff --git a/core/capabilities/remote/executable/parallel_execution_limiter.go b/core/capabilities/remote/executable/parallel_execution_limiter.go index ff62ad6146e..d745382ff5c 100644 --- a/core/capabilities/remote/executable/parallel_execution_limiter.go +++ b/core/capabilities/remote/executable/parallel_execution_limiter.go @@ -28,8 +28,7 @@ func newParallelExecutionLimiter(maxParallelTasks int64) *parallelExecutionLimit } // ExecuteTask executes a task in parallel, returning ErrMaxParallelExecutionLimitReached if the maximum number of -// -// parallel executing tasks has been reached. The task is executed with the given context. +// parallel executing tasks has been reached. The task is executed with the given context. func (t *parallelExecutionLimiter) ExecuteTask(ctx context.Context, task func(ctx context.Context)) error { if !t.sem.TryAcquire(1) { return ErrMaxParallelExecutionLimitReached diff --git a/core/capabilities/remote/executable/parallel_execution_limiter_test.go b/core/capabilities/remote/executable/parallel_execution_limiter_test.go index 43c18447eed..01d706bf1cf 100644 --- a/core/capabilities/remote/executable/parallel_execution_limiter_test.go +++ b/core/capabilities/remote/executable/parallel_execution_limiter_test.go @@ -22,9 +22,7 @@ func Test_CancellingContext_StopsTask(t *testing.T) { cancelFns = append(cancelFns, cancel) err := tp.ExecuteTask(ctx, func(ctx context.Context) { atomic.AddInt32(&counter, 1) - select { - case <-ctx.Done(): - } + <-ctx.Done() atomic.AddInt32(&counter, -1) }) @@ -46,35 +44,31 @@ func Test_CancellingContext_StopsTask(t *testing.T) { tp.Close() } -func TestThreadPool_Full_Error(t *testing.T) { +func Test_ParallelExecutionLimitReached(t *testing.T) { tp := newParallelExecutionLimiter(3) for i := 0; i < 3; i++ { err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { - select { - case <-ctx.Done(): - } + <-ctx.Done() }) require.NoError(t, err) } err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { }) - assert.Error(t, err) + require.Error(t, err) assert.Equal(t, ErrMaxParallelExecutionLimitReached, err) tp.Close() } -func TestThreadPool_ExecutingTasks(t *testing.T) { +func Test_ExecutingMultipleTasksInParallel(t *testing.T) { tp := newParallelExecutionLimiter(10) var counter int32 for i := 0; i < 10; i++ { err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { atomic.AddInt32(&counter, 1) - select { - case <-ctx.Done(): - } + <-ctx.Done() atomic.AddInt32(&counter, -1) }) require.NoError(t, err) @@ -86,15 +80,14 @@ func TestThreadPool_ExecutingTasks(t *testing.T) { } -func TestThreadPool_Stop(t *testing.T) { +func Test_StopExecutingMultipleParallelTasks(t *testing.T) { tp := newParallelExecutionLimiter(10) var counter int32 for i := 0; i < 10; i++ { err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { atomic.AddInt32(&counter, 1) - select { - case <-ctx.Done(): + if <-ctx.Done(); true { } atomic.AddInt32(&counter, -1) }) diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index 2b88b38815b..d3dd2249e2d 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -201,7 +201,6 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { reqAndMsgID := r.requestIDToRequest[requestID] go func() { - err = reqAndMsgID.request.OnMessage(ctx, msg) if err != nil { r.lggr.Errorw("request failed to OnMessage new message", "messageID", reqAndMsgID.messageID, "err", err) diff --git a/core/capabilities/remote/executable/server_test.go b/core/capabilities/remote/executable/server_test.go index 83d104c54cc..cf0e1ade43a 100644 --- a/core/capabilities/remote/executable/server_test.go +++ b/core/capabilities/remote/executable/server_test.go @@ -63,7 +63,8 @@ func Test_Server_Execute_SlowCapabilityExecutionDoesNotImpactSubsequentCall(t *t val := capabilityResponse.Value.Underlying["response"] var valAsStr string - val.UnwrapTo(&valAsStr) + err = val.UnwrapTo(&valAsStr) + require.NoError(t, err) assert.Equal(t, "1s", valAsStr) } From c81b82091bfac4301df00e5a75be7731f2440924 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Sun, 2 Mar 2025 10:41:46 -0500 Subject: [PATCH 6/6] lint --- core/capabilities/remote/executable/endtoend_test.go | 8 +++----- .../remote/executable/parallel_execution_limiter_test.go | 6 +----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index b704b4ec398..8c130e5ae49 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -87,7 +87,6 @@ func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecutio } return false - }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s delay capability") } @@ -151,7 +150,6 @@ func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecutio } return false - }, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s delay capability") } @@ -473,9 +471,9 @@ type TestSlowExecutionCapability struct { func (t *TestSlowExecutionCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) { var delay time.Duration - if d, ok := t.workflowIDToPause[request.Metadata.WorkflowID]; ok { - delay = d - } else { + + delay, ok := t.workflowIDToPause[request.Metadata.WorkflowID] + if !ok { panic("workflowID not found") } diff --git a/core/capabilities/remote/executable/parallel_execution_limiter_test.go b/core/capabilities/remote/executable/parallel_execution_limiter_test.go index 01d706bf1cf..33b94c5d906 100644 --- a/core/capabilities/remote/executable/parallel_execution_limiter_test.go +++ b/core/capabilities/remote/executable/parallel_execution_limiter_test.go @@ -17,7 +17,6 @@ func Test_CancellingContext_StopsTask(t *testing.T) { var counter int32 for i := 0; i < 10; i++ { - ctx, cancel := context.WithCancel(context.Background()) cancelFns = append(cancelFns, cancel) err := tp.ExecuteTask(ctx, func(ctx context.Context) { @@ -77,7 +76,6 @@ func Test_ExecutingMultipleTasksInParallel(t *testing.T) { assert.Eventually(t, func() bool { return atomic.LoadInt32(&counter) == 10 }, 10*time.Second, 10*time.Millisecond) - } func Test_StopExecutingMultipleParallelTasks(t *testing.T) { @@ -87,8 +85,7 @@ func Test_StopExecutingMultipleParallelTasks(t *testing.T) { for i := 0; i < 10; i++ { err := tp.ExecuteTask(context.Background(), func(ctx context.Context) { atomic.AddInt32(&counter, 1) - if <-ctx.Done(); true { - } + <-ctx.Done() atomic.AddInt32(&counter, -1) }) require.NoError(t, err) @@ -103,5 +100,4 @@ func Test_StopExecutingMultipleParallelTasks(t *testing.T) { assert.Eventually(t, func() bool { return atomic.LoadInt32(&counter) == 0 }, 10*time.Second, 10*time.Millisecond) - }