Skip to content

Commit

Permalink
Fixed usage of AgentAwareTask
Browse files Browse the repository at this point in the history
  • Loading branch information
fkorotkov committed Sep 29, 2024
1 parent e2188a0 commit 9085845
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions internal/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (worker *Worker) startTask(
}

worker.imagesCounter.Add(ctx, 1, metric.WithAttributes(inst.Attributes()...))
go worker.runTask(taskCtx, agentAwareTask, upstream, inst, taskID, agentAwareTask.ClientSecret)
go worker.runTask(taskCtx, upstream, inst, agentAwareTask.CliVersion,
taskID, agentAwareTask.ClientSecret, agentAwareTask.ServerSecret)

worker.logger.Infof("started task %s", taskID)
}
Expand Down Expand Up @@ -113,15 +114,16 @@ func (worker *Worker) getInstance(

func (worker *Worker) runTask(
ctx context.Context,
agentAwareTask *api.PollResponse_AgentAwareTask,
upstream *upstreampkg.Upstream,
inst abstract.Instance,
cliVersion string,
taskID string,
clientSecret string,
serverSecret string,
) {
// Provide tags for Sentry: task ID and upstream worker name
cirrusSentryTags := map[string]string{
"cirrus.task_id": agentAwareTask.TaskId,
"cirrus.task_id": taskID,
"cirrus.upstream_worker_name": upstream.WorkerName(),
}

Expand Down Expand Up @@ -149,15 +151,15 @@ func (worker *Worker) runTask(
defer func() {
if err := inst.Close(ctx); err != nil {
worker.logger.Errorf("failed to close persistent worker instance for task %s: %v",
agentAwareTask.TaskId, err)
taskID, err)
}

worker.taskCompletions <- agentAwareTask.TaskId
worker.taskCompletions <- taskID
}()

if err := upstream.TaskStarted(ctx, api.OldTaskIdentification(taskID, clientSecret)); err != nil {
worker.logger.Errorf("failed to notify the server about the started task %s: %v",
agentAwareTask.TaskId, err)
taskID, err)

return
}
Expand All @@ -170,22 +172,22 @@ func (worker *Worker) runTask(
config := runconfig.RunConfig{
ProjectDir: "",
Endpoint: upstream.AgentEndpoint(),
ServerSecret: agentAwareTask.ServerSecret,
ClientSecret: agentAwareTask.ClientSecret,
TaskID: agentAwareTask.TaskId,
ServerSecret: serverSecret,
ClientSecret: clientSecret,
TaskID: taskID,
AdditionalEnvironment: map[string]string{
"CIRRUS_SENTRY_TAGS": strings.Join(cirrusSentryTagsFormatted, ","),
},
}

if err := config.SetCLIVersionWithoutDowngrade(agentAwareTask.CliVersion); err != nil {
worker.logger.Warnf("failed to set CLI's version for task %s: %v", agentAwareTask.TaskId, err)
if err := config.SetCLIVersionWithoutDowngrade(cliVersion); err != nil {
worker.logger.Warnf("failed to set CLI's version for task %s: %v", taskID, err)
}

err := inst.Run(ctx, &config)

if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(ctx.Err(), context.Canceled) {
worker.logger.Errorf("failed to run task %s: %v", agentAwareTask.TaskId, err)
worker.logger.Errorf("failed to run task %s: %v", taskID, err)

boundedCtx, cancel := context.WithTimeout(context.Background(), perCallTimeout)
defer cancel()
Expand All @@ -205,12 +207,12 @@ func (worker *Worker) runTask(
})

err := upstream.TaskFailed(boundedCtx, &api.TaskFailedRequest{
TaskIdentification: api.OldTaskIdentification(taskID, agentAwareTask.ClientSecret),
TaskIdentification: api.OldTaskIdentification(taskID, clientSecret),
Message: err.Error(),
})
if err != nil {
worker.logger.Errorf("failed to notify the server about the failed task %s: %v",
agentAwareTask.TaskId, err)
taskID, err)
localHub.WithScope(func(scope *sentry.Scope) {
scope.SetTags(cirrusSentryTags)

Expand All @@ -231,9 +233,9 @@ func (worker *Worker) runTask(
boundedCtx, cancel := context.WithTimeout(context.Background(), perCallTimeout)
defer cancel()

if err = upstream.TaskStopped(boundedCtx, api.OldTaskIdentification(taskID, agentAwareTask.ClientSecret)); err != nil {
if err = upstream.TaskStopped(boundedCtx, api.OldTaskIdentification(taskID, clientSecret)); err != nil {
worker.logger.Errorf("failed to notify the server about the stopped task %s: %v",
agentAwareTask.TaskId, err)
taskID, err)
localHub.WithScope(func(scope *sentry.Scope) {
scope.SetTags(cirrusSentryTags)
scope.SetLevel(sentry.LevelFatal)
Expand Down

0 comments on commit 9085845

Please sign in to comment.