Skip to content

Commit

Permalink
Async task handling improvement (#209)
Browse files Browse the repository at this point in the history
* All threads are core thread

* Drop later submitted pathfinding tasks if task before is not started yet

* Auto-resize is gone

* Refine error handling

* Handle rejected execution

* Limit size and schedule on EntityScheduler

* Allow pr to build

* Remove duplicate path handling
Since it's a very rare case and Kaiiju has already done something to handle this

* Update thread and logger name format

* Core pool to 1

* Revert entity scheduler changes

* Expose queue size to config

* Add reject policy config to pathfinding

* [ci/skip] To uppercase

* [ci/skip] Add co-authors

---------

Co-authored-by: Taiyou06 <[email protected]>
Co-authored-by: Altiami <[email protected]>
  • Loading branch information
3 people authored Feb 6, 2025
1 parent aa48b3b commit b1be529
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-1214.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "dev/1.21.4" ]
pull_request:
branches: [ "ver/1.21.4" ]
branches: [ "dev/1.21.4" ]

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Original project: https://github.com/KaiijuMC/Kaiiju
Original license: GPLv3
Original project: https://github.com/Bloom-host/Petal

Co-authored-by: HaHaWTH <[email protected]>
Co-authored-by: Taiyou06 <[email protected]>
Co-authored-by: Altiami <[email protected]>

This patch was ported downstream from the Petal fork.

Makes most pathfinding-related work happen asynchronously
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.dreeam.leaf.async.path;

import com.destroystokyo.paper.util.SneakyThrow;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import net.minecraft.server.MinecraftServer;
import net.minecraft.world.level.pathfinder.Path;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;

Expand All @@ -16,19 +21,59 @@
*/
public class AsyncPathProcessor {

private static final Executor pathProcessingExecutor = new ThreadPoolExecutor(
private static final String THREAD_PREFIX = "Leaf Async Pathfinding";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor(
1,
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingMaxThreads,
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
getQueueImpl(),
new ThreadFactoryBuilder()
.setNameFormat("Leaf Async Pathfinding Thread - %d")
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build()
.build(),
new RejectedTaskHandler()
);

private static class RejectedTaskHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (!executor.isShutdown()) {
switch (org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingRejectPolicy) {
case FLUSH_ALL -> {
if (!workQueue.isEmpty()) {
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());

workQueue.drainTo(pendingTasks);

for (Runnable pendingTask : pendingTasks) {
pendingTask.run();
}
}
rejectedTask.run();
}
case CALLER_RUNS -> rejectedTask.run();
}
}

if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help.");
lastWarnMillis = System.currentTimeMillis();
}
}
}

protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
return CompletableFuture.runAsync(path::process, pathProcessingExecutor);
return CompletableFuture.runAsync(path::process, pathProcessingExecutor)
.orTimeout(60L, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException e) {
LOGGER.warn("Async Pathfinding process timed out", e);
} else SneakyThrow.sneaky(throwable);
return null;
});
}

/**
Expand All @@ -48,4 +93,9 @@ public static void awaitProcessing(@Nullable Path path, Consumer<@Nullable Path>
afterProcessing.accept(path);
}
}
}

private static BlockingQueue<Runnable> getQueueImpl() {
final int queueCapacity = org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingQueueSize;

return new LinkedBlockingQueue<>(queueCapacity);
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.dreeam.leaf.async.path;

import org.dreeam.leaf.config.LeafConfig;

import java.util.Locale;

public enum PathfindTaskRejectPolicy {
FLUSH_ALL,
CALLER_RUNS;

public static PathfindTaskRejectPolicy fromString(String policy) {
try {
return PathfindTaskRejectPolicy.valueOf(policy.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
LeafConfig.LOGGER.warn("Invalid pathfind task reject policy: {}, falling back to {}.", policy, FLUSH_ALL.toString());
return FLUSH_ALL;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

public class MultithreadedTracker {

private static final Logger LOGGER = LogManager.getLogger("MultithreadedTracker");
private static final String THREAD_PREFIX = "Leaf Async Tracker";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor trackerExecutor = new ThreadPoolExecutor(
getCorePoolSize(),
Expand Down Expand Up @@ -128,27 +129,23 @@ private static int getCorePoolSize() {
}

private static int getMaxPoolSize() {
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? Integer.MAX_VALUE : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
}

private static long getKeepAliveTime() {
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? 30L : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
}

private static BlockingQueue<Runnable> getQueueImpl() {
if (org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize) {
return new SynchronousQueue<>();
}

final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads * (Math.max(org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads, 4));
final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerQueueSize;

return new LinkedBlockingQueue<>(queueCapacity);
}

private static @NotNull ThreadFactory getThreadFactory() {
return new ThreadFactoryBuilder()
.setThreadFactory(MultithreadedTrackerThread::new)
.setNameFormat("Leaf Async Tracker Thread - %d")
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.dreeam.leaf.config.modules.async;

import org.dreeam.leaf.async.path.PathfindTaskRejectPolicy;
import org.dreeam.leaf.config.ConfigModules;
import org.dreeam.leaf.config.EnumConfigCategory;
import org.dreeam.leaf.config.LeafConfig;
Expand All @@ -13,12 +14,25 @@ public String getBasePath() {
public static boolean enabled = false;
public static int asyncPathfindingMaxThreads = 0;
public static int asyncPathfindingKeepalive = 60;
public static int asyncPathfindingQueueSize = 0;
public static PathfindTaskRejectPolicy asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.FLUSH_ALL;

@Override
public void onLoaded() {
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
asyncPathfindingMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncPathfindingMaxThreads);
asyncPathfindingKeepalive = config.getInt(getBasePath() + ".keepalive", asyncPathfindingKeepalive);
asyncPathfindingQueueSize = config.getInt(getBasePath() + ".queue-size", asyncPathfindingQueueSize);
asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.fromString(config.getString(getBasePath() + ".reject-policy", asyncPathfindingRejectPolicy.toString(), config.pickStringRegionBased(
"""
The policy to use when the queue is full and a new task is submitted.
FLUSH_ALL: All pending tasks will be run on server thread.
CALLER_RUNS: Newly submitted task will be run on server thread.""",
"""
当队列满时, 新提交的任务将使用以下策略处理.
FLUSH_ALL: 所有等待中的任务都将在主线程上运行.
CALLER_RUNS: 新提交的任务将在主线程上运行."""
)));

if (asyncPathfindingMaxThreads < 0)
asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncPathfindingMaxThreads, 1);
Expand All @@ -28,5 +42,8 @@ else if (asyncPathfindingMaxThreads == 0)
asyncPathfindingMaxThreads = 0;
else
LeafConfig.LOGGER.info("Using {} threads for Async Pathfinding", asyncPathfindingMaxThreads);

if (asyncPathfindingQueueSize <= 0)
asyncPathfindingQueueSize = asyncPathfindingMaxThreads * Math.max(asyncPathfindingMaxThreads, 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public String getBasePath() {

public static boolean enabled = false;
public static boolean compatModeEnabled = false;
public static boolean autoResize = false;
public static int asyncEntityTrackerMaxThreads = 0;
public static int asyncEntityTrackerKeepalive = 60;
public static int asyncEntityTrackerQueueSize = 0;

@Override
public void onLoaded() {
Expand All @@ -33,16 +33,9 @@ public void onLoaded() {
"""
是否启用兼容模式,
如果你的服务器安装了 Citizens 或其他类似非发包 NPC 插件, 请开启此项."""));
autoResize = config.getBoolean(getBasePath() + ".auto-resize", autoResize, config.pickStringRegionBased("""
Auto adjust thread pool size based on server load,
This will tweak thread pool size dynamically,
overrides max-threads and keepalive.""",
"""
根据服务器负载自动调整线程池大小,
这会使线程池大小动态调整,
覆盖设置 max-threads 和 keepalive."""));
asyncEntityTrackerMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncEntityTrackerMaxThreads);
asyncEntityTrackerKeepalive = config.getInt(getBasePath() + ".keepalive", asyncEntityTrackerKeepalive);
asyncEntityTrackerQueueSize = config.getInt(getBasePath() + ".queue-size", asyncEntityTrackerQueueSize);

if (asyncEntityTrackerMaxThreads < 0)
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1);
Expand All @@ -53,5 +46,8 @@ else if (asyncEntityTrackerMaxThreads == 0)
asyncEntityTrackerMaxThreads = 0;
else
LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", asyncEntityTrackerMaxThreads);

if (asyncEntityTrackerQueueSize <= 0)
asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * Math.max(asyncEntityTrackerMaxThreads, 4);
}
}

0 comments on commit b1be529

Please sign in to comment.