From ffec359817c9c4e8fe3fadc0e976d30f75f27171 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Axel=20Rovill=C3=A9?= Date: Wed, 4 Sep 2024 16:44:20 +0200 Subject: [PATCH] Send a subset of Spark properties on application start and end Log basic driver and executor properties. Sending an event when the application starts and when it ends will allow to catch configs that may have changed during execution, and differ from those sent in the spark-submit. --- .../spark/listener/GarmadonSparkListener.java | 55 +++++++++++++++++++ .../garmadon/hdfs/EventsWithHeader.java | 6 ++ .../hadoop/garmadon/hdfs/ReaderFactory.java | 2 + .../serialization/GarmadonSerialization.java | 3 + schema/src/main/protobuf/spark_event.proto | 17 ++++++ 5 files changed, 83 insertions(+) diff --git a/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java b/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java index 191a3598..d86cb22c 100644 --- a/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java +++ b/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java @@ -4,6 +4,7 @@ import com.criteo.hadoop.garmadon.event.proto.SparkEventProtos; import com.criteo.hadoop.garmadon.schema.enums.State; import com.criteo.hadoop.garmadon.schema.events.Header; +import org.apache.spark.SparkConf; import org.apache.spark.scheduler.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,45 @@ private void sendExecutorStateEvent(long time, State state, String executorId, S this.eventHandler.accept(time, buildOverrideHeader(executorId), executorStateEvent.build()); } + private void sendApplicationStateEvent(long time, State state) { + SparkConf conf = new SparkConf(); + String deployMode = conf.get("spark.submit.deployMode", "unknown"); + + int driverMemoryMb = (int) conf.getSizeAsMb("spark.driver.memory", "0"); + int driverMemoryOverheadMb = (int) conf.getSizeAsMb("spark.driver.memoryOverhead", "0"); + int driverCores = conf.getInt("spark.driver.cores", 0); + + int executorMemoryMb = (int) conf.getSizeAsMb("spark.executor.memory", "0"); + int executorMemoryOverheadMb = (int) conf.getSizeAsMb("spark.executor.memoryOverhead", "0"); + float executorMemoryOverheadFactor = (float) conf.getDouble("spark.executor.memoryOverheadFactor", 0.0); + int executorCores = conf.getInt("spark.executor.cores", 0); + + int executorInstances = conf.getInt("spark.executor.instances", 0); + boolean dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false); + int dynamicAllocationMinExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0); + int dynamicAllocationMaxExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 0); + int dynamicAllocationInitialExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", 0); + + SparkEventProtos.ApplicationStateEvent.Builder applicationStateEvent = SparkEventProtos.ApplicationStateEvent + .newBuilder() + .setState(state.name()) + .setDeployMode(deployMode) + .setDriverMemoryMb(driverMemoryMb) + .setDriverMemoryOverheadMb(driverMemoryOverheadMb) + .setDriverCores(driverCores) + .setExecutorMemoryMb(executorMemoryMb) + .setExecutorMemoryOverheadMb(executorMemoryOverheadMb) + .setExecutorMemoryOverheadFactor(executorMemoryOverheadFactor) + .setExecutorCores(executorCores) + .setExecutorInstances(executorInstances) + .setDynamicAllocationEnabled(dynamicAllocationEnabled) + .setDynamicAllocationMinExecutors(dynamicAllocationMinExecutors) + .setDynamicAllocationMaxExecutors(dynamicAllocationMaxExecutors) + .setDynamicAllocationInitialExecutors(dynamicAllocationInitialExecutors); + + this.eventHandler.accept(time, header, applicationStateEvent.build()); + } + @Override public void onApplicationStart(SparkListenerApplicationStart applicationStart) { try { @@ -86,11 +126,26 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { .withApplicationName(applicationStart.appName()) .build()) .toSerializeHeader(); + } catch (Throwable t) { + LOGGER.warn("Failed to initialize header on application startup", t); + } + + try { + sendApplicationStateEvent(applicationStart.time(), State.BEGIN); } catch (Throwable t) { LOGGER.warn("Failed to send event for onApplicationStart", t); } } + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + try { + sendApplicationStateEvent(applicationEnd.time(), State.END); + } catch (Throwable t) { + LOGGER.warn("Failed to send event for onApplicationEnd", t); + } + } + // Stage Events @Override public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { diff --git a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java index 3a6173d4..06b0e23b 100644 --- a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java +++ b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java @@ -75,6 +75,12 @@ public static Descriptors.Descriptor getDescriptor() throws Descriptors.Descript } } + public static abstract class SparkApplicationStateEvent implements Message { + public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException { + return descriptorForTypeWithHeader(SparkEventProtos.ApplicationStateEvent.getDescriptor()); + } + } + public static abstract class ApplicationEvent implements Message { public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException { return descriptorForTypeWithHeader(ResourceManagerEventProtos.ApplicationEvent.getDescriptor()); diff --git a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java index bf712222..299eb194 100644 --- a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java +++ b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java @@ -71,6 +71,8 @@ public class ReaderFactory { EventsWithHeader.SparkRddStorageStatus.class, SparkEventProtos.RDDStorageStatus.newBuilder()); addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_EXECUTOR_STORAGE_STATUS_EVENT, "spark_executor_storage_status", EventsWithHeader.SparkExecutorStorageStatus.class, SparkEventProtos.ExecutorStorageStatus.newBuilder()); + addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_APPLICATION_STATE_EVENT, "spark_application_state", + EventsWithHeader.SparkApplicationStateEvent.class, SparkEventProtos.ApplicationStateEvent.newBuilder()); addTypeMapping(out, GarmadonSerialization.TypeMarker.APPLICATION_EVENT, "application_event", EventsWithHeader.ApplicationEvent.class, ResourceManagerEventProtos.ApplicationEvent.newBuilder()); addTypeMapping(out, GarmadonSerialization.TypeMarker.CONTAINER_EVENT, "container_event", diff --git a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java index 4d900d77..db888fd9 100644 --- a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java +++ b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java @@ -30,6 +30,7 @@ public interface TypeMarker { int SPARK_TASK_EVENT = 3003; int SPARK_EXECUTOR_STORAGE_STATUS_EVENT = 3004; int SPARK_RDD_STORAGE_STATUS_EVENT = 3005; + int SPARK_APPLICATION_STATE_EVENT = 3006; int APPLICATION_EVENT = 4000; int CONTAINER_EVENT = 4001; int FLINK_JOB_MANAGER_EVENT = 5000; @@ -79,6 +80,8 @@ public interface TypeMarker { SparkEventProtos.ExecutorStorageStatus::toByteArray, SparkEventProtos.ExecutorStorageStatus::parseFrom); register(SparkEventProtos.RDDStorageStatus.class, TypeMarker.SPARK_RDD_STORAGE_STATUS_EVENT, "SPARK_RDD_STORAGE_STATUS_EVENT", SparkEventProtos.RDDStorageStatus::toByteArray, SparkEventProtos.RDDStorageStatus::parseFrom); + register(SparkEventProtos.ApplicationStateEvent.class, TypeMarker.SPARK_APPLICATION_STATE_EVENT, "SPARK_APPLICATION_STATE_EVENT", + SparkEventProtos.ApplicationStateEvent::toByteArray, SparkEventProtos.ApplicationStateEvent::parseFrom); // Flink events register(FlinkEventProtos.JobManagerEvent.class, TypeMarker.FLINK_JOB_MANAGER_EVENT, "FLINK_JOB_MANAGER_EVENT", diff --git a/schema/src/main/protobuf/spark_event.proto b/schema/src/main/protobuf/spark_event.proto index a630eb03..c989b374 100644 --- a/schema/src/main/protobuf/spark_event.proto +++ b/schema/src/main/protobuf/spark_event.proto @@ -135,3 +135,20 @@ message RDDStorageStatus { int64 memory_used = 3; int64 disk_used = 4; } + +message ApplicationStateEvent { + string state = 1; + string deploy_mode = 2; + int32 driver_memory_mb = 3; + int32 driver_memory_overhead_mb = 4; + int32 driver_cores = 5; + int32 executor_memory_mb = 6; + int32 executor_memory_overhead_mb = 7; + float executor_memory_overhead_factor = 8; + int32 executor_cores = 9; + int32 executor_instances = 10; + bool dynamic_allocation_enabled = 11; + int32 dynamic_allocation_min_executors = 12; + int32 dynamic_allocation_max_executors = 13; + int32 dynamic_allocation_initial_executors = 14; +}