Skip to content

Commit

Permalink
Decouple tracking and storage in MinionEventObserver using MinionTask…
Browse files Browse the repository at this point in the history
…ObserverStorageManager (#15044)

* Decouple tracking and storage in MinionEventObserver

* ensure stats deletion upon event observer removal

* fixes

* feedback fixes

* fix MinionTaskBaseObserverStats to allow extending it

* add license

* fixes

* added missing docs and pre checks on method params

* reverting to original behaviour and not failing on null taskId input
  • Loading branch information
shounakmk219 authored Feb 24, 2025
1 parent 9ca8a87 commit 49f0eba
Show file tree
Hide file tree
Showing 23 changed files with 724 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.minion.event.MinionEventObserverFactory;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;

import static org.testng.Assert.assertTrue;

Expand All @@ -39,6 +40,11 @@ public class TestEventObserverFactory implements MinionEventObserverFactory {
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager,
MinionTaskObserverStorageManager taskProgressManager) {
}

@Override
public String getTaskType() {
return SimpleMinionClusterIntegrationTest.TASK_TYPE;
Expand All @@ -47,6 +53,10 @@ public String getTaskType() {
@Override
public MinionEventObserver create() {
return new MinionEventObserver() {
@Override
public void init(MinionTaskObserverStorageManager progressManager) {
}

@Override
public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
SimpleMinionClusterIntegrationTest.TASK_START_NOTIFIED.set(true);
Expand All @@ -68,6 +78,10 @@ public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) {
SimpleMinionClusterIntegrationTest.TASK_ERROR_NOTIFIED.set(true);
}

@Override
public void cleanup() {
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.minion.event.DefaultMinionTaskObserverStorageManager;
import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
import org.apache.pinot.minion.event.MinionEventObserverFactory;
import org.apache.pinot.minion.event.MinionEventObservers;
Expand All @@ -66,8 +67,10 @@
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
Expand Down Expand Up @@ -123,7 +126,8 @@ public void init(PinotConfiguration config)
_helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
_taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
_eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
_eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager,
getMinionTaskProgressManager());
_executorService =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
MinionEventObservers.init(_config, _executorService);
Expand Down Expand Up @@ -168,6 +172,26 @@ public void registerEventObserverFactory(MinionEventObserverFactory eventObserve
_eventObserverFactoryRegistry.registerEventObserverFactory(eventObserverFactory);
}

public MinionTaskObserverStorageManager getMinionTaskProgressManager() {
String progressManagerClassName = _config.getProperty(MinionConf.MINION_TASK_PROGRESS_MANAGER_CLASS);
MinionTaskObserverStorageManager progressManager = null;
if (StringUtils.isNotEmpty(progressManagerClassName)) {
try {
LOGGER.info("Trying to create MinionTaskProgressManager with {}", progressManagerClassName);
progressManager = PluginManager.get().createInstance(progressManagerClassName);
} catch (Exception e) {
LOGGER.error("Unable to load MinionTaskProgressManager with class {}",
progressManagerClassName, e);
}
}
if (progressManager == null) {
LOGGER.info("Creating MinionTaskProgressManager with DefaultMinionTaskProgressManager");
progressManager = new DefaultMinionTaskObserverStorageManager();
}
progressManager.init(_config);
return progressManager;
}

@Override
public ServiceRole getServiceRole() {
return ServiceRole.MINION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

public class MinionConf extends PinotConfiguration {
public static final String END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY = "pinot.minion.endReplaceSegments.timeoutMs";
public static final String MINION_TASK_PROGRESS_MANAGER_CLASS = "pinot.minion.taskProgressManager.class";
public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 * 60 * 1000; // 10 mins

public MinionConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@
package org.apache.pinot.minion.event;

import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;


/**
* Base factory for {@link MinionEventObserver}.
*/
public abstract class BaseMinionProgressObserverFactory implements MinionEventObserverFactory {

protected MinionTaskObserverStorageManager _taskProgressManager;

/**
* Initializes the task executor factory.
*/
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager,
MinionTaskObserverStorageManager taskProgressManager) {
_taskProgressManager = taskProgressManager;
}

/**
* Returns the task type of the event observer.
*/
Expand All @@ -41,6 +50,8 @@ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
* Creates a new task event observer.
*/
public MinionEventObserver create() {
return new MinionProgressObserver();
MinionProgressObserver observer = new MinionProgressObserver();
observer.init(_taskProgressManager);
return observer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@

import javax.annotation.Nullable;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;


/**
* Default no-op minion event observer which can be extended.
*/
public class DefaultMinionEventObserver implements MinionEventObserver {

protected MinionTaskObserverStorageManager _observerStorageManager;

@Override
public void init(MinionTaskObserverStorageManager observerStorageManager) {
_observerStorageManager = observerStorageManager;
}

@Override
public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
}
Expand All @@ -42,4 +50,8 @@ public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
@Override
public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) {
}

@Override
public void cleanup() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.minion.event;

import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;


public class DefaultMinionEventObserverFactory implements MinionEventObserverFactory {
Expand All @@ -36,6 +37,11 @@ public static DefaultMinionEventObserverFactory getInstance() {
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager,
MinionTaskObserverStorageManager taskProgressManager) {
}

@Override
public String getTaskType() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.minion.event;

import com.google.common.base.Preconditions;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class DefaultMinionTaskObserverStorageManager implements MinionTaskObserverStorageManager {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMinionTaskObserverStorageManager.class);

public static final String MAX_NUM_STATUS_TO_TRACK = "pinot.minion.task.maxNumStatusToTrack";
public static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;

private final Map<String, MinionTaskBaseObserverStats> _minionTaskProgressStatsMap = new HashMap<>();
private int _maxNumStatusToTrack;

private static final DefaultMinionTaskObserverStorageManager DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new DefaultMinionTaskObserverStorageManager();
DEFAULT_INSTANCE.init(new MinionConf());
}

public static DefaultMinionTaskObserverStorageManager getDefaultInstance() {
return DEFAULT_INSTANCE;
}

@Override
public void init(PinotConfiguration configuration) {
try {
_maxNumStatusToTrack =
Integer.parseInt(configuration.getProperty(DefaultMinionTaskObserverStorageManager.MAX_NUM_STATUS_TO_TRACK));
} catch (NumberFormatException e) {
LOGGER.warn("Unable to parse the configured value: {}, using the default value: {} instead.",
DefaultMinionTaskObserverStorageManager.MAX_NUM_STATUS_TO_TRACK,
DefaultMinionTaskObserverStorageManager.DEFAULT_MAX_NUM_STATUS_TO_TRACK);
_maxNumStatusToTrack = DefaultMinionTaskObserverStorageManager.DEFAULT_MAX_NUM_STATUS_TO_TRACK;
}
}

@Nullable
@Override
public MinionTaskBaseObserverStats getTaskProgress(String taskId) {
if (StringUtils.isNotEmpty(taskId) && _minionTaskProgressStatsMap.containsKey(taskId)) {
return new MinionTaskBaseObserverStats(_minionTaskProgressStatsMap.get(taskId));
}
return null;
}

@Override
public synchronized void setTaskProgress(String taskId, MinionTaskBaseObserverStats progress) {
Preconditions.checkNotNull(progress, "Cannot store null MinionTaskBaseObserverStats object.");
_minionTaskProgressStatsMap.put(taskId, progress);
Deque<MinionTaskBaseObserverStats.StatusEntry> progressLogs = progress.getProgressLogs();
int logSize = progressLogs.size();
int overflow = Math.max(logSize - _maxNumStatusToTrack, 0);
if (overflow > 0) {
progressLogs.pollFirst();
}
}

@Override
public MinionTaskBaseObserverStats deleteTaskProgress(String taskId) {
if (StringUtils.isNotEmpty(taskId) && _minionTaskProgressStatsMap.containsKey(taskId)) {
return _minionTaskProgressStatsMap.remove(taskId);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;
import org.apache.pinot.spi.utils.PinotReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,6 +48,11 @@ public class EventObserverFactoryRegistry {
* convention can significantly reduce the time of class scanning.
*/
public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
this(zkMetadataManager, DefaultMinionTaskObserverStorageManager.getDefaultInstance());
}

public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager,
MinionTaskObserverStorageManager taskProgressManager) {
long startTimeMs = System.currentTimeMillis();
Set<Class<?>> classes = PinotReflectionUtils
.getClassesThroughReflection(EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN, EventObserverFactory.class);
Expand All @@ -55,7 +61,7 @@ public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManage
if (annotation.enabled()) {
try {
MinionEventObserverFactory eventObserverFactory = (MinionEventObserverFactory) clazz.newInstance();
eventObserverFactory.init(zkMetadataManager);
eventObserverFactory.init(zkMetadataManager, taskProgressManager);
registerEventObserverFactory(eventObserverFactory);
} catch (Exception e) {
LOGGER.error("Caught exception while initializing and registering event observer factory: {}, skipping it",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import javax.annotation.Nullable;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;


/**
* The <code>MinionEventObserver</code> interface provides call backs for Minion events.
*/
public interface MinionEventObserver {

void init(MinionTaskObserverStorageManager progressManager);

/**
* Invoked when a minion task starts.
*
Expand All @@ -48,6 +52,11 @@ default Object getProgress() {
return null;
}

@Nullable
default MinionTaskBaseObserverStats getProgressStats() {
return null;
}

/**
* Invoked when a minion task succeeds.
*
Expand Down Expand Up @@ -86,4 +95,10 @@ default MinionTaskState getTaskState() {
default long getStartTs() {
return -1;
}

/**
* Place to handle the cleanup required on {@link MinionTaskObserverStorageManager} for the associated task.
* This method should be called before removing the observer.
*/
void cleanup();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.minion.event;

import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.spi.tasks.MinionTaskObserverStorageManager;


/**
Expand All @@ -31,6 +32,12 @@ public interface MinionEventObserverFactory {
*/
void init(MinionTaskZkMetadataManager zkMetadataManager);

/**
* Initializes the task executor factory with the specified {@link MinionTaskObserverStorageManager}
* to manage the storage of stats recorded by the {@link MinionEventObserver}.
*/
void init(MinionTaskZkMetadataManager zkMetadataManager, MinionTaskObserverStorageManager taskProgressManager);

/**
* Returns the task type of the event observer.
*/
Expand Down
Loading

0 comments on commit 49f0eba

Please sign in to comment.