Skip to content

Commit

Permalink
PauseActivity implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Nov 6, 2024
1 parent df12b9a commit 1ad5730
Show file tree
Hide file tree
Showing 21 changed files with 3,786 additions and 3,441 deletions.
3,831 changes: 1,926 additions & 1,905 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1,933 changes: 973 additions & 960 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

1,065 changes: 538 additions & 527 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ message SyncActivityRequest {
google.protobuf.Timestamp last_attempt_complete_time = 19;
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;
}

message SyncActivitiesRequest {
Expand Down Expand Up @@ -647,6 +649,9 @@ message ActivitySyncInfo {
google.protobuf.Timestamp last_attempt_complete_time = 19;
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;

}

message SyncActivityResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ message ActivityInfo {
// It monotonically increments when the activity options are changed.
// It is used to check if activity related tasks are still relevant to their corresponding state machine.
int32 stamp = 41;

// Paused state. When activity is paused it will not advance until unpaused.
// Iw will not be scheduled, timer tasks will not be processed, etc.
// Note: it still can be cancelled/completed.
bool paused = 42;
}

// timer_map column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ message SyncActivityTaskAttributes {
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
// It monotonically increments when the activity options are changed.
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;
}

message HistoryTaskAttributes {
Expand Down
68 changes: 62 additions & 6 deletions service/history/api/pauseactivity/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,73 @@ package pauseactivity
import (
"context"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
_ context.Context,
_ *historyservice.PauseActivityRequest,
_ shard.Context,
_ api.WorkflowConsistencyChecker,
ctx context.Context,
request *historyservice.PauseActivityRequest,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.PauseActivityResponse, retError error) {
return nil, serviceerror.NewUnimplemented("PauseActivity is not supported yet")
err := api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
definition.NewWorkflowKey(
request.NamespaceId,
request.GetFrontendRequest().WorkflowId,
request.GetFrontendRequest().RunId,
),
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
var err error
err = pauseActivity(mutableState, request)
if err != nil {
return nil, err
}
return &api.UpdateWorkflowAction{
Noop: false,
CreateWorkflowTask: false,
}, nil
},
nil,
shardContext,
workflowConsistencyChecker,
)

if err != nil {
return nil, err
}

return &historyservice.PauseActivityResponse{}, nil
}

func pauseActivity(mutableState workflow.MutableState, request *historyservice.PauseActivityRequest) error {
if !mutableState.IsWorkflowExecutionRunning() {
return consts.ErrWorkflowCompleted
}
frontendRequest := request.GetFrontendRequest()
activityId := frontendRequest.GetActivityId()

ai, activityFound := mutableState.GetActivityByActivityID(activityId)

if !activityFound {
return consts.ErrActivityNotFound
}
if ai.Paused {
// do nothing
return nil
}

mutableState.UpdateActivityWithCallback(ai, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.Paused = true
})

return mutableState.UpdatePausedEntitiesSearchAttribute()
}
144 changes: 144 additions & 0 deletions service/history/api/pauseactivity/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package pauseactivity

import (
"testing"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
"go.uber.org/mock/gomock"
)

type (
pauseActivitySuite struct {
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockEventsCache *events.MockCache
mockNamespaceCache *namespace.MockRegistry
mockTaskGenerator *workflow.MockTaskGenerator
mockMutableState *workflow.MockMutableState
mockClusterMetadata *cluster.MockMetadata

executionInfo *persistencespb.WorkflowExecutionInfo

validator *api.CommandAttrValidator

logger log.Logger
}
)

func TestActivityOptionsSuite(t *testing.T) {
s := new(pauseActivitySuite)
suite.Run(t, s)
}

func (s *pauseActivitySuite) SetupSuite() {
}

func (s *pauseActivitySuite) TearDownSuite() {
}

func (s *pauseActivitySuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockTaskGenerator = workflow.NewMockTaskGenerator(s.controller)
s.mockMutableState = workflow.NewMockMutableState(s.controller)

s.mockShard = shard.NewTestContext(
s.controller,
&persistencespb.ShardInfo{
ShardId: 0,
RangeId: 1,
},
tests.NewDynamicConfig(),
)

s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata
s.mockEventsCache = s.mockShard.MockEventsCache
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

s.logger = s.mockShard.GetLogger()
s.executionInfo = &persistencespb.WorkflowExecutionInfo{
VersionHistories: versionhistory.NewVersionHistories(&historyspb.VersionHistory{}),
FirstExecutionRunId: uuid.New(),
WorkflowExecutionTimerTaskStatus: workflow.TimerTaskStatusCreated,
}
s.mockMutableState.EXPECT().GetExecutionInfo().Return(s.executionInfo).AnyTimes()
s.mockMutableState.EXPECT().GetCurrentVersion().Return(int64(1)).AnyTimes()

s.validator = api.NewCommandAttrValidator(
s.mockShard.GetNamespaceRegistry(),
s.mockShard.GetConfig(),
nil,
)
}

func (s *pauseActivitySuite) TearDownTest() {
s.controller.Finish()
s.mockShard.StopForTest()
}

func (s *pauseActivitySuite) TestPauseActivityAcceptance() {
activityInfo := &persistencespb.ActivityInfo{
TaskQueue: "task_queue_name",
ActivityId: "activity_id",
ActivityType: &commonpb.ActivityType{
Name: "activity_type",
},
}

request := &historyservice.PauseActivityRequest{
NamespaceId: "NamespaceID",
}

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(activityInfo, true)
s.mockMutableState.EXPECT().UpdatePausedEntitiesSearchAttribute().Return(nil)
s.mockMutableState.EXPECT().UpdateActivityWithCallback(gomock.Any(), gomock.Any())

err := pauseActivity(s.mockMutableState, request)
s.NoError(err)
}
4 changes: 3 additions & 1 deletion service/history/api/respondactivitytaskfailed/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ func Invoke(

postActions := &api.UpdateWorkflowAction{}
failure := request.GetFailure()
mutableState.RecordLastActivityStarted(ai)
mutableState.RecordLastActivityCompleteTime(ai)
retryState, err := mutableState.RetryActivity(ai, failure)
if err != nil {
return nil, err
}
// TODO uncomment once RETRY_STATE_PAUSED is supported
// if retryState != enumspb.RETRY_STATE_IN_PROGRESS && retryState != enumspb.RETRY_STATE_PAUSED {
if retryState != enumspb.RETRY_STATE_IN_PROGRESS {
// no more retry, and we want to record the failure event
if _, err := mutableState.AddActivityTaskFailedEvent(scheduledEventID, ai.StartedEventId, failure, retryState, request.GetIdentity(), request.GetWorkerVersion()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/respondactivitytaskfailed/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (s *workflowSuite) setupMutableState(uc UsecaseConfig, ai *persistencepb.Ac

currentMutableState.EXPECT().GetWorkflowType().Return(uc.wfType).AnyTimes()
if uc.expectRetryActivity {
currentMutableState.EXPECT().RecordLastActivityStarted(gomock.Any())
currentMutableState.EXPECT().RecordLastActivityCompleteTime(gomock.Any())
currentMutableState.EXPECT().RetryActivity(ai, gomock.Any()).Return(uc.retryActivityState, uc.retryActivityError)
currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false).AnyTimes()
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/api/updateactivityoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Invoke(
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
var err error
response, err = updateActivityOptions(shardContext, validator, mutableState, request)
response, err = updateActivityOptions(validator, mutableState, request)
if err != nil {
return nil, err
}
Expand All @@ -88,7 +88,6 @@ func Invoke(
}

func updateActivityOptions(
shardContext shard.Context,
validator *api.CommandAttrValidator,
mutableState workflow.MutableState,
request *historyservice.UpdateActivityOptionsRequest,
Expand Down
6 changes: 3 additions & 3 deletions service/history/api/updateactivityoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsWfNotRunning() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false)

_, err := updateActivityOptions(s.mockShard, s.validator, s.mockMutableState, request)
_, err := updateActivityOptions(s.validator, s.mockMutableState, request)
s.Error(err)
s.ErrorAs(err, &consts.ErrWorkflowCompleted)
}
Expand All @@ -342,7 +342,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsWfNoActivity() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(nil, false)
_, err := updateActivityOptions(s.mockShard, s.validator, s.mockMutableState, request)
_, err := updateActivityOptions(s.validator, s.mockMutableState, request)
s.Error(err)
s.ErrorAs(err, &consts.ErrActivityNotFound)
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsAcceptance() {
},
}

response, err := updateActivityOptions(s.mockShard, s.validator, s.mockMutableState, request)
response, err := updateActivityOptions(s.validator, s.mockMutableState, request)

s.NoError(err)
s.NotNil(response)
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/activity_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
FirstScheduledTime: request.FirstScheduledTime,
LastAttemptCompleteTime: request.LastAttemptCompleteTime,
Stamp: request.Stamp,
Paused: request.Paused,
},
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/history/replication/executable_activity_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewExecutableActivityStateTask(
FirstScheduledTime: task.FirstScheduledTime,
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
},

batchable: true,
Expand All @@ -126,6 +127,7 @@ func NewExecutableActivityStateTask(
FirstScheduledTime: task.FirstScheduledTime,
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
}),
}
}
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func convertActivityStateReplicationTask(
FirstScheduledTime: activityInfo.FirstScheduledTime,
LastAttemptCompleteTime: activityInfo.LastAttemptCompleteTime,
Stamp: activityInfo.Stamp,
Paused: activityInfo.Paused,
},
},
VisibilityTime: timestamppb.New(taskInfo.VisibilityTimestamp),
Expand Down
2 changes: 2 additions & 0 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
}

if retryState == enumspb.RETRY_STATE_IN_PROGRESS {
// TODO uncommment once RETRY_STATE_PAUSED is supported
// || retryState == enumspb.RETRY_STATE_PAUSED {
result.shouldUpdateMutableState = true
return result, nil
}
Expand Down
19 changes: 9 additions & 10 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,15 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
return nil, err
}

if activityInfo.Attempt > task.Attempt {
return nil, nil
}

if activityInfo.Stamp != task.Stamp {
// this retry task is from old Stamp. In this case we should ignore it
return nil, nil
}

if activityInfo.StartedEventId != common.EmptyEventID {
// we ignore retry timer task if:
// * this retry task is from old Stamp.
// * attempts is not the same as recorded in activity info.
// * activity is already started.
// * activity is paused.
if activityInfo.Attempt > task.Attempt ||
activityInfo.Stamp != task.Stamp ||
activityInfo.StartedEventId != common.EmptyEventID ||
activityInfo.Paused {
return nil, nil
}

Expand Down
Loading

0 comments on commit 1ad5730

Please sign in to comment.