Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PauseActivity implementation #6752

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3,831 changes: 1,926 additions & 1,905 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1,933 changes: 973 additions & 960 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

1,065 changes: 538 additions & 527 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ message SyncActivityRequest {
google.protobuf.Timestamp last_attempt_complete_time = 19;
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;
ychebotarev marked this conversation as resolved.
Show resolved Hide resolved
}

message SyncActivitiesRequest {
Expand Down Expand Up @@ -647,6 +649,9 @@ message ActivitySyncInfo {
google.protobuf.Timestamp last_attempt_complete_time = 19;
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;

}

message SyncActivityResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ message ActivityInfo {
// It monotonically increments when the activity options are changed.
// It is used to check if activity related tasks are still relevant to their corresponding state machine.
int32 stamp = 41;

// Paused state. When activity is paused it will not advance until unpaused.
// Iw will not be scheduled, timer tasks will not be processed, etc.
// Note: it still can be cancelled/completed.
bool paused = 42;
}

// timer_map column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ message SyncActivityTaskAttributes {
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
// It monotonically increments when the activity options are changed.
int32 stamp = 20;
// Flag indicating whether the activity is currently paused.
bool paused = 21;
}

message HistoryTaskAttributes {
Expand Down
46 changes: 40 additions & 6 deletions service/history/api/pauseactivity/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,51 @@ package pauseactivity
import (
"context"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
_ context.Context,
_ *historyservice.PauseActivityRequest,
_ shard.Context,
_ api.WorkflowConsistencyChecker,
ctx context.Context,
request *historyservice.PauseActivityRequest,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.PauseActivityResponse, retError error) {
return nil, serviceerror.NewUnimplemented("PauseActivity is not supported yet")
err := api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
definition.NewWorkflowKey(
request.NamespaceId,
request.GetFrontendRequest().GetWorkflowId(),
request.GetFrontendRequest().GetRunId(),
),
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
frontendRequest := request.GetFrontendRequest()
activityId := frontendRequest.GetActivityId()

var err error

err = workflow.PauseActivityById(mutableState, activityId)
if err != nil {
return nil, err
}
return &api.UpdateWorkflowAction{
Noop: false,
CreateWorkflowTask: false,
}, nil
},
nil,
shardContext,
workflowConsistencyChecker,
)

if err != nil {
return nil, err
}

return &historyservice.PauseActivityResponse{}, nil
}
139 changes: 139 additions & 0 deletions service/history/api/pauseactivity/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package pauseactivity

import (
"testing"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
"go.uber.org/mock/gomock"
)

type (
pauseActivitySuite struct {
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockEventsCache *events.MockCache
mockNamespaceCache *namespace.MockRegistry
mockTaskGenerator *workflow.MockTaskGenerator
mockMutableState *workflow.MockMutableState
mockClusterMetadata *cluster.MockMetadata

executionInfo *persistencespb.WorkflowExecutionInfo

validator *api.CommandAttrValidator

logger log.Logger
}
)

func TestActivityOptionsSuite(t *testing.T) {
s := new(pauseActivitySuite)
suite.Run(t, s)
}

func (s *pauseActivitySuite) SetupSuite() {
}

func (s *pauseActivitySuite) TearDownSuite() {
}

func (s *pauseActivitySuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTaskGenerator = workflow.NewMockTaskGenerator(s.controller)
s.mockMutableState = workflow.NewMockMutableState(s.controller)

s.mockShard = shard.NewTestContext(
s.controller,
&persistencespb.ShardInfo{
ShardId: 0,
RangeId: 1,
},
tests.NewDynamicConfig(),
)

s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata
s.mockEventsCache = s.mockShard.MockEventsCache
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

s.logger = s.mockShard.GetLogger()
s.executionInfo = &persistencespb.WorkflowExecutionInfo{
VersionHistories: versionhistory.NewVersionHistories(&historyspb.VersionHistory{}),
FirstExecutionRunId: uuid.New(),
WorkflowExecutionTimerTaskStatus: workflow.TimerTaskStatusCreated,
}
s.mockMutableState.EXPECT().GetExecutionInfo().Return(s.executionInfo).AnyTimes()
s.mockMutableState.EXPECT().GetCurrentVersion().Return(int64(1)).AnyTimes()

s.validator = api.NewCommandAttrValidator(
s.mockShard.GetNamespaceRegistry(),
s.mockShard.GetConfig(),
nil,
)
}

func (s *pauseActivitySuite) TearDownTest() {
s.controller.Finish()
s.mockShard.StopForTest()
}

func (s *pauseActivitySuite) TestPauseActivityAcceptance() {
activityId := "activity_id"
activityInfo := &persistencespb.ActivityInfo{
TaskQueue: "task_queue_name",
ActivityId: activityId,
ActivityType: &commonpb.ActivityType{
Name: "activity_type",
},
}

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(activityInfo, true)
s.mockMutableState.EXPECT().UpdateActivityWithCallback(gomock.Any(), gomock.Any())

err := workflow.PauseActivityById(s.mockMutableState, activityId)
s.NoError(err)
}
4 changes: 3 additions & 1 deletion service/history/api/respondactivitytaskfailed/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ func Invoke(

postActions := &api.UpdateWorkflowAction{}
failure := request.GetFailure()
mutableState.RecordLastActivityStarted(ai)
mutableState.RecordLastActivityCompleteTime(ai)
retryState, err := mutableState.RetryActivity(ai, failure)
if err != nil {
return nil, err
}
// TODO uncomment once RETRY_STATE_PAUSED is supported
// if retryState != enumspb.RETRY_STATE_IN_PROGRESS && retryState != enumspb.RETRY_STATE_PAUSED {
if retryState != enumspb.RETRY_STATE_IN_PROGRESS {
// no more retry, and we want to record the failure event
if _, err := mutableState.AddActivityTaskFailedEvent(scheduledEventID, ai.StartedEventId, failure, retryState, request.GetIdentity(), request.GetWorkerVersion()); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (s *workflowSuite) setupMutableState(uc UsecaseConfig, ai *persistencepb.Ac

currentMutableState.EXPECT().GetWorkflowType().Return(uc.wfType).AnyTimes()
if uc.expectRetryActivity {
currentMutableState.EXPECT().RecordLastActivityStarted(gomock.Any())
ychebotarev marked this conversation as resolved.
Show resolved Hide resolved
currentMutableState.EXPECT().RecordLastActivityCompleteTime(gomock.Any())
currentMutableState.EXPECT().RetryActivity(ai, gomock.Any()).Return(uc.retryActivityState, uc.retryActivityError)
currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false).AnyTimes()
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/api/updateactivityoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Invoke(
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
var err error
response, err = updateActivityOptions(shardContext, validator, mutableState, request)
response, err = updateActivityOptions(validator, mutableState, request)
if err != nil {
return nil, err
}
Expand All @@ -88,7 +88,6 @@ func Invoke(
}

func updateActivityOptions(
shardContext shard.Context,
validator *api.CommandAttrValidator,
mutableState workflow.MutableState,
request *historyservice.UpdateActivityOptionsRequest,
Expand Down
6 changes: 3 additions & 3 deletions service/history/api/updateactivityoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsWfNotRunning() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false)

_, err := updateActivityOptions(s.mockShard, s.validator, s.mockMutableState, request)
_, err := updateActivityOptions(s.validator, s.mockMutableState, request)
s.Error(err)
s.ErrorAs(err, &consts.ErrWorkflowCompleted)
}
Expand All @@ -342,7 +342,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsWfNoActivity() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(nil, false)
_, err := updateActivityOptions(s.mockShard, s.validator, s.mockMutableState, request)
_, err := updateActivityOptions(s.validator, s.mockMutableState, request)
s.Error(err)
s.ErrorAs(err, &consts.ErrActivityNotFound)
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsAcceptance() {
},
}

response, err := updateActivityOptions(s.mockShard, s.validator, s.mockMutableState, request)
response, err := updateActivityOptions(s.validator, s.mockMutableState, request)

s.NoError(err)
s.NotNil(response)
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/activity_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
FirstScheduledTime: request.FirstScheduledTime,
LastAttemptCompleteTime: request.LastAttemptCompleteTime,
Stamp: request.Stamp,
Paused: request.Paused,
},
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewExecutableActivityStateTask(
FirstScheduledTime: task.FirstScheduledTime,
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
},

batchable: true,
Expand All @@ -126,6 +127,7 @@ func NewExecutableActivityStateTask(
FirstScheduledTime: task.FirstScheduledTime,
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
}),
}
}
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func convertActivityStateReplicationTask(
FirstScheduledTime: activityInfo.FirstScheduledTime,
LastAttemptCompleteTime: activityInfo.LastAttemptCompleteTime,
Stamp: activityInfo.Stamp,
Paused: activityInfo.Paused,
},
},
VisibilityTime: timestamppb.New(taskInfo.VisibilityTimestamp),
Expand Down
2 changes: 2 additions & 0 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
}

if retryState == enumspb.RETRY_STATE_IN_PROGRESS {
// TODO uncommment once RETRY_STATE_PAUSED is supported
// || retryState == enumspb.RETRY_STATE_PAUSED {
result.shouldUpdateMutableState = true
return result, nil
}
Expand Down
16 changes: 7 additions & 9 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,14 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
return nil, err
}

if activityInfo.Attempt > task.Attempt {
return nil, nil
}

if activityInfo.Stamp != task.Stamp {
// this retry task is from old Stamp. In this case we should ignore it
return nil, nil
}
// we ignore retry timer task if:
// * this retry task is from old Stamp.
// * attempts is not the same as recorded in activity info.
// * activity is already started.
if activityInfo.Attempt > task.Attempt ||
activityInfo.Stamp != task.Stamp ||
activityInfo.StartedEventId != common.EmptyEventID {

if activityInfo.StartedEventId != common.EmptyEventID {
return nil, nil
}

Expand Down
Loading
Loading