Skip to content

Commit

Permalink
Remove SkipGenerateWorkflowTask from signal APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Nov 13, 2024
1 parent 4955f72 commit 3b07474
Show file tree
Hide file tree
Showing 17 changed files with 441 additions and 757 deletions.
1 change: 0 additions & 1 deletion service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func NewWorkflowWithSignal(
signalWithStartRequest.GetSignalInput(),
signalWithStartRequest.GetIdentity(),
signalWithStartRequest.GetHeader(),
signalWithStartRequest.GetSkipGenerateWorkflowTask(),
signalWithStartRequest.GetLinks(),
); err != nil {
return nil, err
Expand Down
17 changes: 1 addition & 16 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,16 +472,11 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
}
}

bufferedEventShouldCreateNewTask := hasBufferedEventsOrMessages && ms.HasAnyBufferedEvent(eventShouldGenerateNewTaskFilter)
if hasBufferedEventsOrMessages && !bufferedEventShouldCreateNewTask {
// Make sure tasks that should not create a new event don't get stuck in ms forever
ms.FlushBufferedEvents()
}
newWorkflowTaskType := enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED
if ms.IsWorkflowExecutionRunning() {
if request.GetForceCreateNewWorkflowTask() || // Heartbeat WT is always of Normal type.
wtFailedShouldCreateNewTask ||
bufferedEventShouldCreateNewTask ||
hasBufferedEventsOrMessages ||
activityNotStartedCancelled {

newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL
Expand Down Expand Up @@ -1039,13 +1034,3 @@ func (handler *WorkflowTaskCompletedHandler) clearStickyTaskQueue(ctx context.Co
}
return nil
}

// Filter function to be passed to mutable_state.HasAnyBufferedEvent
// Returns true if the event should generate a new workflow task
// Currently only signal events with SkipGenerateWorkflowTask=true flag set do not generate tasks
func eventShouldGenerateNewTaskFilter(event *historypb.HistoryEvent) bool {
if event.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
return true
}
return !event.GetWorkflowExecutionSignaledEventAttributes().GetSkipGenerateWorkflowTask()
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,13 @@ func signalWorkflow(
request.GetSignalInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
request.GetLinks(),
); err != nil {
return err
}

// Create a transfer task to schedule a workflow task
if !mutableState.HasPendingWorkflowTask() && !request.GetSkipGenerateWorkflowTask() {
if !mutableState.HasPendingWorkflowTask() {

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
s.currentMutableState,
)
request := s.randomRequest()
request.SkipGenerateWorkflowTask = false

s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false)
s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false)
Expand All @@ -166,7 +165,6 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
request.GetSignalInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
request.GetLinks(),
).Return(&history.HistoryEvent{}, nil)
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
Expand Down Expand Up @@ -200,7 +198,6 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
request.GetSignalInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
request.GetLinks(),
).Return(&history.HistoryEvent{}, nil)
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(true)
Expand Down
3 changes: 1 addition & 2 deletions service/history/api/signalworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Invoke(
executionInfo := mutableState.GetExecutionInfo()

// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() && !request.GetSkipGenerateWorkflowTask()
createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff()

if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowId
Expand All @@ -110,7 +110,6 @@ func Invoke(
request.GetInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
externalWorkflowExecution,
request.GetLinks(),
)
Expand Down
22 changes: 8 additions & 14 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,16 +1712,11 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSingleActivityScheduledWor
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_SignalTaskGeneration() {
resp := s.testRespondWorkflowTaskCompletedSignalGeneration(false)
resp := s.testRespondWorkflowTaskCompletedSignalGeneration()
s.NotNil(resp.GetStartedResponse())
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_SkipSignalTaskGeneration() {
resp := s.testRespondWorkflowTaskCompletedSignalGeneration(true)
s.Nil(resp.GetStartedResponse())
}

func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration(skipGenerateTask bool) *historyservice.RespondWorkflowTaskCompletedResponse {
func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration() *historyservice.RespondWorkflowTaskCompletedResponse {
we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
Expand All @@ -1738,13 +1733,12 @@ func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration(skipGener
identity := "testIdentity"

signal := workflowservice.SignalWorkflowExecutionRequest{
Namespace: tests.NamespaceID.String(),
WorkflowExecution: &we,
Identity: identity,
SignalName: "test signal name",
Input: payloads.EncodeString("test input"),
SkipGenerateWorkflowTask: skipGenerateTask,
RequestId: uuid.New(),
Namespace: tests.NamespaceID.String(),
WorkflowExecution: &we,
Identity: identity,
SignalName: "test signal name",
Input: payloads.EncodeString("test input"),
RequestId: uuid.New(),
}
signalRequest := &historyservice.SignalWorkflowExecutionRequest{
NamespaceId: tests.NamespaceID.String(),
Expand Down
2 changes: 0 additions & 2 deletions service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,6 @@ func (b *EventFactory) CreateWorkflowExecutionSignaledEvent(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) *historypb.HistoryEvent {
Expand All @@ -789,7 +788,6 @@ func (b *EventFactory) CreateWorkflowExecutionSignaledEvent(
Input: input,
Identity: identity,
Header: header,
SkipGenerateWorkflowTask: skipGenerateWorkflowTask,
ExternalWorkflowExecution: externalWorkflowExecution,
},
}
Expand Down
2 changes: 0 additions & 2 deletions service/history/historybuilder/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,6 @@ func (b *HistoryBuilder) AddWorkflowExecutionSignaledEvent(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) *historypb.HistoryEvent {
Expand All @@ -677,7 +676,6 @@ func (b *HistoryBuilder) AddWorkflowExecutionSignaledEvent(
input,
identity,
header,
skipGenerateWorkflowTask,
externalWorkflowExecution,
links,
)
Expand Down
16 changes: 0 additions & 16 deletions service/history/ndc/events_reapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ package ndc
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
enumsspb "go.temporal.io/server/api/enums/v1"
Expand Down Expand Up @@ -90,26 +89,11 @@ func (r *EventsReapplierImpl) ReapplyEvents(
return nil, nil
}

shouldScheduleWorkflowTask := false
for _, event := range reappliedEvents {
switch event.GetEventType() { //nolint:exhaustive
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
signal := event.GetWorkflowExecutionSignaledEventAttributes()
shouldScheduleWorkflowTask = shouldScheduleWorkflowTask || !signal.GetSkipGenerateWorkflowTask()
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
shouldScheduleWorkflowTask = true
}
}

// After reapply event, checking if we should schedule a workflow task
if ms.IsWorkflowPendingOnWorkflowTaskBackoff() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
return reappliedEvents, nil
}
if !shouldScheduleWorkflowTask {
// Do not create workflow task when all reapplied signals had SkipGenerateWorkflowTask=true flag set
return reappliedEvents, nil
}

if !ms.HasPendingWorkflowTask() {
if _, err := ms.AddWorkflowTaskScheduledEvent(
Expand Down
3 changes: 0 additions & 3 deletions service/history/ndc/events_reapplier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Signal() {
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
).Return(event, nil)
msCurrent.EXPECT().HSM().Return(s.hsmNode).AnyTimes()
Expand Down Expand Up @@ -275,7 +274,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_PartialAppliedEvent() {
attr1.GetInput(),
attr1.GetIdentity(),
attr1.GetHeader(),
attr1.GetSkipGenerateWorkflowTask(),
event1.Links,
).Return(event1, nil)
msCurrent.EXPECT().IsWorkflowPendingOnWorkflowTaskBackoff().Return(true)
Expand Down Expand Up @@ -323,7 +321,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_Error() {
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
).Return(nil, fmt.Errorf("test"))
dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion())
Expand Down
1 change: 0 additions & 1 deletion service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@ func reapplyEvents(
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
); err != nil {
return reappliedEvents, err
Expand Down
1 change: 0 additions & 1 deletion service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ func (s *workflowResetterSuite) TestReapplyEvents() {
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
).Return(&historypb.HistoryEvent{}, nil)
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
Expand Down
2 changes: 0 additions & 2 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,13 @@ type (
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error)
AddWorkflowExecutionSignaledEvent(
signalName string,
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error)
Expand Down
4 changes: 0 additions & 4 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4261,15 +4261,13 @@ func (ms *MutableStateImpl) AddWorkflowExecutionSignaled(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error) {
return ms.AddWorkflowExecutionSignaledEvent(
signalName,
input,
identity,
header,
skipGenerateWorkflowTask,
nil,
links,
)
Expand All @@ -4280,7 +4278,6 @@ func (ms *MutableStateImpl) AddWorkflowExecutionSignaledEvent(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error) {
Expand All @@ -4294,7 +4291,6 @@ func (ms *MutableStateImpl) AddWorkflowExecutionSignaledEvent(
input,
identity,
header,
skipGenerateWorkflowTask,
externalWorkflowExecution,
links,
)
Expand Down
Loading

0 comments on commit 3b07474

Please sign in to comment.