diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index a4fdfc8638b..60edb3a011a 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -223,7 +223,7 @@ var ( AllowZeroSleep: true, ReuseTimer: true, NextTimeCacheV2Size: 14, // see note below - Version: UseLastAction, + Version: ActionResultIncludesStatus, } // Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index 52db0ac5326..bf0acda6ac5 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -1636,11 +1636,6 @@ func (s *workflowSuite) TestUpdate() { } func (s *workflowSuite) TestUpdateNotRetroactive() { - // TODO - remove when AccurateFutureActionTimes becomes the active version - prevTweakables := CurrentTweakablePolicies - CurrentTweakablePolicies.Version = AccurateFutureActionTimes - defer func() { CurrentTweakablePolicies = prevTweakables }() - s.runAcrossContinue( []workflowRun{ { @@ -1876,11 +1871,6 @@ func (s *workflowSuite) TestPauseUnpauseBetweenNominalAndJittered() { } func (s *workflowSuite) TestLimitedActions() { - // TODO - remove when AccurateFutureActionTimes becomes the active version - prevTweakables := CurrentTweakablePolicies - CurrentTweakablePolicies.Version = AccurateFutureActionTimes - defer func() { CurrentTweakablePolicies = prevTweakables }() - // written using low-level mocks so we can sleep forever // limited to 2 diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 42a0097af8a..c70298435fa 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -227,7 +227,13 @@ func (s *ScheduleFunctionalSuite) TestBasics() { s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 2 }, 15*time.Second, 500*time.Millisecond) time.Sleep(2 * time.Second) //nolint:forbidigo - // describe + // wait for visibility to stabilize on completed before calling describe, + // otherwise their recent actions may flake and differ + + visibilityResponse := s.getScheduleEntryFomVisibility(sid, func(ent *schedulepb.ScheduleListEntry) bool { + recentActions := ent.GetInfo().GetRecentActions() + return len(recentActions) >= 2 && recentActions[1].GetStartWorkflowStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED + }) describeResp, err := s.FrontendClient().DescribeSchedule(testcore.NewContext(), &workflowservice.DescribeScheduleRequest{ Namespace: s.Namespace(), @@ -235,6 +241,8 @@ func (s *ScheduleFunctionalSuite) TestBasics() { }) s.NoError(err) + // validate describe response + checkSpec := func(spec *schedulepb.ScheduleSpec) { protorequire.ProtoSliceEqual(s.T(), schedule.Spec.Interval, spec.Interval) s.Nil(spec.Calendar) @@ -275,22 +283,20 @@ func (s *ScheduleFunctionalSuite) TestBasics() { s.Equal(wfSAValue.Data, describeResp.Schedule.Action.GetStartWorkflow().SearchAttributes.IndexedFields[csaKeyword].Data) s.Equal(wfMemo.Data, describeResp.Schedule.Action.GetStartWorkflow().Memo.Fields["wfmemo1"].Data) + // GreaterOrEqual is used as we may have had other runs start while waiting for visibility s.DurationNear(describeResp.Info.CreateTime.AsTime().Sub(createTime), 0, 3*time.Second) - s.EqualValues(2, describeResp.Info.ActionCount) + s.GreaterOrEqual(describeResp.Info.ActionCount, int64(2)) s.EqualValues(0, describeResp.Info.MissedCatchupWindow) s.EqualValues(0, describeResp.Info.OverlapSkipped) - s.EqualValues(0, len(describeResp.Info.RunningWorkflows)) - s.EqualValues(2, len(describeResp.Info.RecentActions)) + s.GreaterOrEqual(len(describeResp.Info.RunningWorkflows), 0) + s.GreaterOrEqual(len(describeResp.Info.RecentActions), 2) action0 := describeResp.Info.RecentActions[0] s.WithinRange(action0.ScheduleTime.AsTime(), createTime, time.Now()) s.True(action0.ScheduleTime.AsTime().UnixNano()%int64(5*time.Second) == 0) s.DurationNear(action0.ActualTime.AsTime().Sub(action0.ScheduleTime.AsTime()), 0, 3*time.Second) - // list + // validate list response - visibilityResponse := s.getScheduleEntryFomVisibility(sid, func(ent *schedulepb.ScheduleListEntry) bool { - return len(ent.GetInfo().GetRecentActions()) >= 2 - }) s.Equal(sid, visibilityResponse.ScheduleId) s.Equal(schSAValue.Data, visibilityResponse.SearchAttributes.IndexedFields[csaKeyword].Data) s.Equal(schSAIntValue.Data, describeResp.SearchAttributes.IndexedFields[csaInt].Data) @@ -318,8 +324,14 @@ func (s *ScheduleFunctionalSuite) TestBasics() { } ex0 := wfResp.Executions[0] s.True(strings.HasPrefix(ex0.Execution.WorkflowId, wid)) - s.True(ex0.Execution.RunId == describeResp.Info.RecentActions[0].GetStartWorkflowResult().RunId || - ex0.Execution.RunId == describeResp.Info.RecentActions[1].GetStartWorkflowResult().RunId) + matchingRunId := false + for _, recentAction := range describeResp.GetInfo().GetRecentActions() { + if ex0.GetExecution().GetRunId() == recentAction.GetStartWorkflowResult().GetRunId() { + matchingRunId = true + break + } + } + s.True(matchingRunId, "ListWorkflowExecutions returned a run ID wasn't in the describe response") s.Equal(wt, ex0.Type.Name) s.Nil(ex0.ParentExecution) // not a child workflow s.Equal(wfMemo.Data, ex0.Memo.Fields["wfmemo1"].Data) @@ -1046,11 +1058,6 @@ func (s *ScheduleFunctionalSuite) TestRateLimit() { } func (s *ScheduleFunctionalSuite) TestListSchedulesReturnsWorkflowStatus() { - // TODO - remove when ActionResultIncludesStatus becomes the active version - prevTweakables := scheduler.CurrentTweakablePolicies - scheduler.CurrentTweakablePolicies.Version = scheduler.ActionResultIncludesStatus - defer func() { scheduler.CurrentTweakablePolicies = prevTweakables }() - sid := "sched-test-list-running" wid := "sched-test-list-running-wf" wt := "sched-test-list-running-wt"