Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build test #16652

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability

var (
// TODO: make this configurable
defaultTargetRequestTimeout = 8 * time.Minute
defaultTargetRequestTimeout = 8 * time.Minute
defaultMaxParallelCapabilityExecuteRequests = int64(100)
)

func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error {
Expand Down Expand Up @@ -473,6 +474,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
idsToDONs,
w.dispatcher,
defaultTargetRequestTimeout,
defaultMaxParallelCapabilityExecuteRequests,
w.lggr,
), nil
}
Expand Down Expand Up @@ -505,6 +507,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
idsToDONs,
w.dispatcher,
defaultTargetRequestTimeout,
defaultMaxParallelCapabilityExecuteRequests,
w.lggr,
), nil
}
Expand Down
2 changes: 2 additions & 0 deletions core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
const (
stepReferenceID1 = "step1"
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowID2 = "25c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce1"
workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
workflowExecutionID2 = "85ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeee"
workflowOwnerID = "0xAA"
)

Expand Down
181 changes: 167 additions & 14 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"errors"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -26,6 +27,132 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecution_AllAtOnce(t *testing.T) {
ctx := testutils.Context(t)

workflowIDToPause := map[string]time.Duration{}
workflowIDToPause[workflowID1] = 1 * time.Minute
workflowIDToPause[workflowID2] = 1 * time.Second
capability := &TestSlowExecutionCapability{
workflowIDToPause: workflowIDToPause,
}

var callCount int64

numWorkflowPeers := int64(10)
var testShuttingDown int32

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
shuttingDown := atomic.LoadInt32(&testShuttingDown)
if shuttingDown != 0 {
return
}

if assert.NoError(t, responseError) {
mp, err := response.Value.Unwrap()
if assert.NoError(t, err) {
assert.Equal(t, "1s", mp.(map[string]any)["response"].(string))
}

atomic.AddInt64(&callCount, 1)
}
}

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10ms",
})
require.NoError(t, err)

timeOut := 10 * time.Minute

method := func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1)
}
testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10,
9, timeOut, method, false)

method = func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID2)
}
testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10,
9, timeOut, method, false)

require.Eventually(t, func() bool {
count := atomic.LoadInt64(&callCount)

if count == numWorkflowPeers {
atomic.AddInt32(&testShuttingDown, 1)
return true
}

return false
}, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s delay capability")
}

func Test_RemoteExecutableCapability_ExecutionNotBlockedBySlowCapabilityExecution_OneAtATime(t *testing.T) {
ctx := testutils.Context(t)

workflowIDToPause := map[string]time.Duration{}
workflowIDToPause[workflowID1] = 1 * time.Minute
workflowIDToPause[workflowID2] = 1 * time.Second
capability := &TestSlowExecutionCapability{
workflowIDToPause: workflowIDToPause,
}

var callCount int64

numWorkflowPeers := int64(10)
var testShuttingDown int32

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
shuttingDown := atomic.LoadInt32(&testShuttingDown)
if shuttingDown != 0 {
return
}

if assert.NoError(t, responseError) {
mp, err := response.Value.Unwrap()
if assert.NoError(t, err) {
assert.Equal(t, "1s", mp.(map[string]any)["response"].(string))
}

atomic.AddInt64(&callCount, 1)
}
}

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
"deltaStage": "10ms",
})
require.NoError(t, err)

timeOut := 10 * time.Minute

method := func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1)
}
testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10,
9, timeOut, method, false)

method = func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID2)
}
testRemoteExecutableCapability(ctx, t, capability, int(numWorkflowPeers), 9, timeOut, 10,
9, timeOut, method, false)

require.Eventually(t, func() bool {
count := atomic.LoadInt64(&callCount)

if count == numWorkflowPeers {
atomic.AddInt32(&testShuttingDown, 1)
return true
}

return false
}, 1*time.Minute, 10*time.Millisecond, "require 10 callbacks from 1s delay capability")
}

func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) {
ctx := testutils.Context(t)

Expand All @@ -49,20 +176,20 @@ func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) {
capability := &TestCapability{}

method := func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest)
executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1)
}
testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method, true)

transmissionSchedule, err = values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10ms",
})
require.NoError(t, err)
method = func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest)
executeCapability(ctx, t, caller, transmissionSchedule, responseTest, workflowID1)
}

testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method)
testRemoteExecutableCapability(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, method, true)
}

func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) {
Expand All @@ -81,11 +208,11 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) {
methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "error executing request: failed to execute capability", responseError.Error())
})
}, workflowID1)
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, method)
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, method, true)
}
}

Expand All @@ -105,18 +232,18 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) {
methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "error executing request: failed to execute capability", responseError.Error())
})
}, workflowID1)
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 1*time.Second, 10, 9, 10*time.Minute,
method)
method, true)
}
}

func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlying commoncap.ExecutableCapability, numWorkflowPeers int, workflowDonF uint8, workflowNodeTimeout time.Duration,
numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration,
method func(ctx context.Context, caller commoncap.ExecutableCapability)) {
method func(ctx context.Context, caller commoncap.ExecutableCapability), waitForExecuteCalls bool) {
lggr := logger.TestLogger(t)

capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers)
Expand Down Expand Up @@ -166,7 +293,7 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin
capabilityPeer := capabilityPeers[i]
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer)
capabilityNode := executable.NewServer(&commoncap.RemoteExecutableConfig{RequestHashExcludedAttributes: []string{}}, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher,
capabilityNodeResponseTimeout, lggr)
capabilityNodeResponseTimeout, 10, lggr)
servicetest.Run(t, capabilityNode)
broker.RegisterReceiverNode(capabilityPeer, capabilityNode)
capabilityNodes[i] = capabilityNode
Expand All @@ -192,8 +319,9 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin
method(ctx, caller)
}(caller)
}

wg.Wait()
if waitForExecuteCalls {
wg.Wait()
}
}

type testAsyncMessageBroker struct {
Expand Down Expand Up @@ -336,6 +464,30 @@ func (t TestCapability) Execute(ctx context.Context, request commoncap.Capabilit
}, nil
}

type TestSlowExecutionCapability struct {
abstractTestCapability
workflowIDToPause map[string]time.Duration
}

func (t *TestSlowExecutionCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
var delay time.Duration

delay, ok := t.workflowIDToPause[request.Metadata.WorkflowID]
if !ok {
panic("workflowID not found")
}

time.Sleep(delay)

response, err := values.NewMap(map[string]any{"response": delay.String()})
if err != nil {
return commoncap.CapabilityResponse{}, err
}
return commoncap.CapabilityResponse{
Value: response,
}, nil
}

type TestErrorCapability struct {
abstractTestCapability
}
Expand Down Expand Up @@ -390,7 +542,8 @@ func libp2pMagic() []byte {
return []byte{0x00, 0x24, 0x08, 0x01, 0x12, 0x20}
}

func executeCapability(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, response commoncap.CapabilityResponse, responseError error)) {
func executeCapability(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, response commoncap.CapabilityResponse, responseError error),
workflowID string) {
executeInputs, err := values.NewMap(
map[string]any{
"executeValue1": "aValue1",
Expand All @@ -400,7 +553,7 @@ func executeCapability(ctx context.Context, t *testing.T, caller commoncap.Execu
response, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
WorkflowID: workflowID,
WorkflowExecutionID: workflowExecutionID1,
},
Config: transmissionSchedule,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package executable

import (
"context"
"errors"
"sync"

"golang.org/x/sync/semaphore"

"github.com/smartcontractkit/chainlink-common/pkg/services"
)

var ErrMaxParallelExecutionLimitReached = errors.New("parallel execution limit reached")

type parallelExecutionLimiter struct {
maxParallelTasks int64
sem *semaphore.Weighted
wg sync.WaitGroup
stopChan services.StopChan
}

func newParallelExecutionLimiter(maxParallelTasks int64) *parallelExecutionLimiter {
return &parallelExecutionLimiter{
maxParallelTasks: maxParallelTasks,
sem: semaphore.NewWeighted(maxParallelTasks),
stopChan: make(services.StopChan),
}
}

// ExecuteTask executes a task in parallel, returning ErrMaxParallelExecutionLimitReached if the maximum number of
// parallel executing tasks has been reached. The task is executed with the given context.
func (t *parallelExecutionLimiter) ExecuteTask(ctx context.Context, task func(ctx context.Context)) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we calling this?

if !t.sem.TryAcquire(1) {
return ErrMaxParallelExecutionLimitReached
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of failing? What are we going to do if this starts failing in production?

}

t.wg.Add(1)
go func() {
defer t.sem.Release(1)
defer t.wg.Done()

ctxWithStop, cancel := t.stopChan.Ctx(ctx)
task(ctxWithStop)
cancel()
}()

return nil
}

func (t *parallelExecutionLimiter) Close() {
close(t.stopChan)
t.wg.Wait()
}
Loading
Loading