Skip to content

Commit

Permalink
Track the actor that triggers the minion task (#14829)
Browse files Browse the repository at this point in the history
* Track the actor that triggered the task

* add java docs

* simplified TaskSchedulingContext

* Track the actor that triggered the task

* add java docs

* simplified TaskSchedulingContext

* variable renames

* fixes
  • Loading branch information
shounakmk219 authored Feb 13, 2025
1 parent 02b3046 commit eb7489c
Show file tree
Hide file tree
Showing 19 changed files with 496 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -80,12 +81,15 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.jersey.server.ManagedAsync;
Expand Down Expand Up @@ -646,29 +650,26 @@ public Map<String, String> scheduleTasks(
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
TaskSchedulingContext context = new TaskSchedulingContext()
.setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
.setMinionInstanceTag(minionInstanceTag)
.setLeader(false);
if (taskType != null) {
// Schedule task for the given task type
PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
context.setTasksToSchedule(Collections.singleton(taskType));
}
if (tableName != null) {
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName, headers)));
} else {
// Schedule tasks for all task types
Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null
? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
context.setDatabasesToSchedule(Collections.singleton(database));
}
Map<String, TaskSchedulingInfo> allTaskInfos = _pinotTaskManager.scheduleTasks(context);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pinot.controller.helix.core.minion;

import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand Down Expand Up @@ -64,8 +66,12 @@ public void execute(JobExecutionContext jobExecutionContext)
ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
return;
}
TaskSchedulingContext context = new TaskSchedulingContext()
.setTablesToSchedule(Collections.singleton(table))
.setTasksToSchedule(Collections.singleton(taskType))
.setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
long jobStartTime = System.currentTimeMillis();
pinotTaskManager.scheduleTaskForTable(taskType, table, null);
pinotTaskManager.scheduleTasks(context);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
if (jobFinishTimeMs > 0) {
taskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(jobFinishTimeMs));
}
String triggeredBy = jobConfig.getTaskConfigMap().values().stream().findFirst()
.map(TaskConfig::getConfigMap)
.map(taskConfigs -> taskConfigs.get(PinotTaskManager.TRIGGERED_BY))
.orElse("");
taskDebugInfo.setTriggeredBy(triggeredBy);
Set<Integer> partitionSet = jobContext.getPartitionSet();
TaskCount subtaskCount = new TaskCount();
for (int partition : partitionSet) {
Expand All @@ -890,6 +895,7 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
subtaskDebugInfo.setTaskId(taskIdForPartition);
subtaskDebugInfo.setState(partitionState);
subtaskDebugInfo.setTriggeredBy(triggeredBy);
long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition);
if (subtaskStartTimeMs > 0) {
subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs));
Expand Down Expand Up @@ -987,7 +993,8 @@ public Map<String, Map<String, Long>> getTaskMetadataLastUpdateTimeMs() {
return MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
}

@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "subtaskInfos"})
@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "triggeredBy",
"subtaskInfos"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class TaskDebugInfo {
// Time at which the task (which may have multiple subtasks) got created.
Expand All @@ -998,6 +1005,7 @@ public static class TaskDebugInfo {
private String _finishTime;
private TaskState _taskState;
private TaskCount _subtaskCount;
private String _triggeredBy;
private List<SubtaskDebugInfo> _subtaskInfos;

public TaskDebugInfo() {
Expand Down Expand Up @@ -1046,6 +1054,15 @@ public TaskState getTaskState() {
return _taskState;
}

public String getTriggeredBy() {
return _triggeredBy;
}

public TaskDebugInfo setTriggeredBy(String triggeredBy) {
_triggeredBy = triggeredBy;
return this;
}

public TaskCount getSubtaskCount() {
return _subtaskCount;
}
Expand All @@ -1055,7 +1072,7 @@ public List<SubtaskDebugInfo> getSubtaskInfos() {
}
}

@JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "taskConfig"})
@JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "triggeredBy", "taskConfig"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class SubtaskDebugInfo {
private String _taskId;
Expand All @@ -1064,6 +1081,7 @@ public static class SubtaskDebugInfo {
private String _finishTime;
private String _participant;
private String _info;
private String _triggeredBy;
private PinotTaskConfig _taskConfig;

public SubtaskDebugInfo() {
Expand Down Expand Up @@ -1121,6 +1139,15 @@ public String getInfo() {
return _info;
}

public String getTriggeredBy() {
return _triggeredBy;
}

public SubtaskDebugInfo setTriggeredBy(String triggeredBy) {
_triggeredBy = triggeredBy;
return this;
}

public PinotTaskConfig getTaskConfig() {
return _taskConfig;
}
Expand Down
Loading

0 comments on commit eb7489c

Please sign in to comment.