From 3d56fe93fa5545eb575f3cdb9e1b065f9081fb48 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 27 Feb 2025 18:53:38 -0500 Subject: [PATCH 1/2] increase rabbitmq consumer timeout --- internal/msgqueue/rabbitmq/rabbitmq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msgqueue/rabbitmq/rabbitmq.go b/internal/msgqueue/rabbitmq/rabbitmq.go index c6ae92399..3c920b7d4 100644 --- a/internal/msgqueue/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/rabbitmq/rabbitmq.go @@ -355,7 +355,7 @@ func (t *MessageQueueImpl) initQueue(sub session, q msgqueue.Queue) (string, err if q.DLX() != "" { args["x-dead-letter-exchange"] = "" args["x-dead-letter-routing-key"] = q.DLX() - args["x-consumer-timeout"] = 5000 // 5 seconds + args["x-consumer-timeout"] = 300000 // 5 minutes dlqArgs := make(amqp.Table) From ff60f5435ef1128f2276b543bab69fd940ed8054 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 28 Feb 2025 10:48:48 -0500 Subject: [PATCH 2/2] increase timeouts on pop runs --- internal/services/controllers/workflows/controller.go | 2 +- pkg/repository/prisma/workflow_run.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/services/controllers/workflows/controller.go b/internal/services/controllers/workflows/controller.go index f460f3fa4..f9fa7bdc2 100644 --- a/internal/services/controllers/workflows/controller.go +++ b/internal/services/controllers/workflows/controller.go @@ -164,7 +164,7 @@ func New(fs ...WorkflowsControllerOpt) (*WorkflowsControllerImpl, error) { w.processWorkflowEventsOps = queueutils.NewOperationPool(w.l, time.Second*5, "process workflow events", w.processWorkflowEvents) w.unpausedWorkflowRunsOps = queueutils.NewOperationPool(w.l, time.Second*5, "unpause workflow runs", w.unpauseWorkflowRuns) - w.bumpQueueOps = queueutils.NewOperationPool(w.l, time.Second*5, "bump queue", w.runPollActiveQueuesTenant) + w.bumpQueueOps = queueutils.NewOperationPool(w.l, time.Second*60, "bump queue", w.runPollActiveQueuesTenant) return w, nil } diff --git a/pkg/repository/prisma/workflow_run.go b/pkg/repository/prisma/workflow_run.go index aef3682c2..8623f4636 100644 --- a/pkg/repository/prisma/workflow_run.go +++ b/pkg/repository/prisma/workflow_run.go @@ -1055,7 +1055,7 @@ func (w *workflowRunEngineRepository) PopWorkflowRunsCancelNewest(ctx context.Co func (w *workflowRunEngineRepository) PopWorkflowRunsRoundRobin(ctx context.Context, tenantId string, workflowVersionId string, maxRuns int) ([]*dbsqlc.WorkflowRun, []*dbsqlc.GetStepRunForEngineRow, error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, w.pool, w.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, w.pool, w.l, 30000) if err != nil { return nil, nil, err