Skip to content

Commit

Permalink
[DSIP-73] Add dolphinscheduler-task-executor module to unify the task…
Browse files Browse the repository at this point in the history
… execution logic
  • Loading branch information
ruanwenjun committed Nov 11, 2024
1 parent 5e2abd7 commit a911cf2
Show file tree
Hide file tree
Showing 255 changed files with 5,515 additions and 6,781 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -306,11 +306,11 @@ public Result stopTask(User loginUser, long projectCode, Integer taskInstanceId)
}

// todo: we only support streaming task for now
final TaskInstanceKillResponse taskInstanceKillResponse = Clients
.withService(ITaskInstanceOperator.class)
final TaskExecutorKillResponse taskExecutorKillResponse = Clients
.withService(IPhysicalTaskExecutorOperator.class)
.withHost(taskInstance.getHost())
.killTask(new TaskInstanceKillRequest(taskInstanceId));
log.info("TaskInstance kill response: {}", taskInstanceKillResponse);
.killTask(TaskExecutorKillRequest.of(taskInstanceId));
log.info("TaskInstance kill response: {}", taskExecutorKillResponse);

putMsg(result, Status.SUCCESS);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
@Slf4j
public class ThreadUtils {

/**
* Create a daemon fixed thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadNameFormat, int threadsNum) {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}
Expand All @@ -43,9 +49,10 @@ public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(S
* Create a daemon scheduler thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
* @param threadsNum the number of threads in the pool
*/
public static ScheduledExecutorService newDaemonScheduledExecutorService(String threadNameFormat, int threadsNum) {
public static ScheduledExecutorService newDaemonScheduledExecutorService(final String threadNameFormat,
final int threadsNum) {
return Executors.newScheduledThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,47 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.experimental.SuperBuilder;

/**
* The abstract class of delay event, the event will be triggered after the delay time.
* <p> You can extend this class to implement your own delay event.
*/
@SuperBuilder
public abstract class AbstractDelayEvent implements IEvent, Delayed {

protected long delayTime;

protected long triggerTimeInMillis;

@Deprecated
protected RetryPolicy retryPolicy;

public AbstractDelayEvent() {
this(0);
}

public AbstractDelayEvent(long delayTime) {
this(delayTime, null);
}

public AbstractDelayEvent(final long delayTime, final RetryPolicy retryPolicy) {
if (delayTime == 0) {
this.triggerTimeInMillis = System.currentTimeMillis();
} else {
this.triggerTimeInMillis = System.currentTimeMillis() + delayTime;
}
this.retryPolicy = retryPolicy;
}

@Override
public boolean isRetryable() {
return retryPolicy != null;
}

@Override
public RetryPolicy getRetryPolicy() {
return retryPolicy;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public Optional<T> poll() {
return Optional.ofNullable(delayEventQueue.poll());
}

@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}

@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.remove());
}

@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,37 @@
*/
public interface IEvent {

/**
* Whether the event can retry, if true the event will be stored in the event bus until the retry policy break.
*/
boolean isRetryable();

/**
* Get the retry policy.
* <p> Use the control retry times and interval.
*/
RetryPolicy getRetryPolicy();

interface RetryPolicy {

/**
* Whether the retry policy has been broken.
*/
boolean isRetryPolicyBroken();

/**
* Increase the retry times.
*/
void increaseRetryTimes();

void setNextRetryTimeStampWithFixedStep();

void setNextRetryTimeStampWithExponentialStep();

/**
* Get the next time which the event can retry.
*/
long getNextRetryTimeStamp();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ public interface IEventBus<T extends IEvent> {
*/
Optional<T> poll() throws InterruptedException;

/**
* peek the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> peek();

/**
* Remove the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> remove();

/**
* Whether the bus is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.eventbus;

import java.time.Duration;

public class LinearTimeRetryPolicy implements IEvent.RetryPolicy {

private int currentRetryTimes = 0;

private final long firstRetryTimeStamp = System.currentTimeMillis();

private long nextRetryTimeStamp = firstRetryTimeStamp;

private static final int maxRetryTimes = 180;

private static final long maxRetryDuration = Duration.ofDays(1).toMillis();

private static final long baseRetryInterval = Duration.ofSeconds(10).toMillis();

@Override
public boolean isRetryPolicyBroken() {
if (currentRetryTimes > maxRetryTimes) {
return true;
}
if (nextRetryTimeStamp > firstRetryTimeStamp + maxRetryDuration) {
return true;
}
return false;
}

@Override
public void increaseRetryTimes() {
currentRetryTimes++;
setNextRetryTimeStampWithExponentialStep();
}

@Override
public void setNextRetryTimeStampWithFixedStep() {
nextRetryTimeStamp += baseRetryInterval;
}

@Override
public void setNextRetryTimeStampWithExponentialStep() {
nextRetryTimeStamp += baseRetryInterval * currentRetryTimes;
}

@Override
public long getNextRetryTimeStamp() {
return nextRetryTimeStamp;
}

@Override
public String toString() {
return "LinearTimeRetryPolicy{" +
"firstRetryTimeStamp=" + firstRetryTimeStamp +
", currentRetryTimes=" + currentRetryTimes +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,19 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.TimeZone;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

/**
* json serialize or deserialize
*/
@Slf4j
public class JsonSerializer {

Expand All @@ -60,13 +57,6 @@ private JsonSerializer() {

}

/**
* serialize to byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
if (obj == null) {
return null;
Expand All @@ -79,44 +69,14 @@ public static <T> byte[] serialize(T obj) {
}
}

/**
* serialize to string
*
* @param obj object
* @param <T> object type
* @return string
*/
public static <T> String serializeToString(T obj) {
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("serializeToString exception!", e);
}

return json;
}

/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
@SneakyThrows
public static <T> T deserialize(byte[] src, Class<T> clazz) {
if (src == null) {
return null;
}

String json = new String(src, StandardCharsets.UTF_8);
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
log.error("deserialize exception!", e);
return null;
}
return objectMapper.readValue(json, clazz);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ private void processReceived(final Channel channel, final Transporter transporte
} else {
args = new Object[standardRpcRequest.getArgs().length];
for (int i = 0; i < standardRpcRequest.getArgs().length; i++) {
args[i] = JsonSerializer.deserialize(standardRpcRequest.getArgs()[i],
args[i] = JsonSerializer.deserialize(
standardRpcRequest.getArgs()[i],
standardRpcRequest.getArgsTypes()[i]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand All @@ -35,9 +33,6 @@ public interface ILogService {
@RpcMethod
TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest);

@RpcMethod
GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest);

@RpcMethod
void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);

Expand Down
Loading

0 comments on commit a911cf2

Please sign in to comment.