diff --git a/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java b/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java index 191a3598..b6240757 100644 --- a/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java +++ b/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java @@ -4,6 +4,7 @@ import com.criteo.hadoop.garmadon.event.proto.SparkEventProtos; import com.criteo.hadoop.garmadon.schema.enums.State; import com.criteo.hadoop.garmadon.schema.events.Header; +import org.apache.spark.SparkConf; import org.apache.spark.scheduler.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,43 @@ private void sendExecutorStateEvent(long time, State state, String executorId, S this.eventHandler.accept(time, buildOverrideHeader(executorId), executorStateEvent.build()); } + private void sendApplicationStateEvent(long time, State state) { + SparkConf conf = new SparkConf(); + String deployMode = conf.get("spark.submit.deployMode", "unknown"); + + int driverMemoryMb = (int) conf.getSizeAsMb("spark.driver.memory", "0"); + int driverMemoryOverheadMb = (int) conf.getSizeAsMb("spark.driver.memoryOverhead", "0"); + int driverCores = (int) conf.getInt("spark.driver.cores", 0); + + int executorMemoryMb = (int) conf.getSizeAsMb("spark.executor.memory", "0"); + int executorMemoryOverheadMb = (int) conf.getSizeAsMb("spark.executor.memoryOverhead", "0"); + int executorCores = (int) conf.getInt("spark.executor.cores", 0); + + int executorInstances = (int) conf.getInt("spark.executor.instances", 0); + boolean dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false); + int dynamicAllocationMinExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0); + int dynamicAllocationMaxExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 0); + int dynamicAllocationInitialExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", 0); + + SparkEventProtos.ApplicationStateEvent.Builder applicationStateEvent = SparkEventProtos.ApplicationStateEvent + .newBuilder() + .setState(state.name()) + .setDeployMode(deployMode) + .setDriverMemoryMb(driverMemoryMb) + .setDriverMemoryOverheadMb(driverMemoryOverheadMb) + .setDriverCores(driverCores) + .setExecutorMemoryMb(executorMemoryMb) + .setExecutorMemoryOverheadMb(executorMemoryOverheadMb) + .setExecutorCores(executorCores) + .setExecutorInstances(executorInstances) + .setDynamicAllocationEnabled(dynamicAllocationEnabled) + .setDynamicAllocationMinExecutors(dynamicAllocationMinExecutors) + .setDynamicAllocationMaxExecutors(dynamicAllocationMaxExecutors) + .setDynamicAllocationInitialExecutors(dynamicAllocationInitialExecutors); + + this.eventHandler.accept(time, header, applicationStateEvent.build()); + } + @Override public void onApplicationStart(SparkListenerApplicationStart applicationStart) { try { @@ -86,11 +124,26 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { .withApplicationName(applicationStart.appName()) .build()) .toSerializeHeader(); + } catch (Throwable t) { + LOGGER.warn("Failed to initialize header on application startup", t); + } + + try { + sendApplicationStateEvent(applicationStart.time(), State.BEGIN); } catch (Throwable t) { LOGGER.warn("Failed to send event for onApplicationStart", t); } } + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + try { + sendApplicationStateEvent(applicationEnd.time(), State.END); + } catch (Throwable t) { + LOGGER.warn("Failed to send event for onApplicationEnd", t); + } + } + // Stage Events @Override public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { diff --git a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java index 3a6173d4..06b0e23b 100644 --- a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java +++ b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/EventsWithHeader.java @@ -75,6 +75,12 @@ public static Descriptors.Descriptor getDescriptor() throws Descriptors.Descript } } + public static abstract class SparkApplicationStateEvent implements Message { + public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException { + return descriptorForTypeWithHeader(SparkEventProtos.ApplicationStateEvent.getDescriptor()); + } + } + public static abstract class ApplicationEvent implements Message { public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException { return descriptorForTypeWithHeader(ResourceManagerEventProtos.ApplicationEvent.getDescriptor()); diff --git a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java index bf712222..299eb194 100644 --- a/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java +++ b/readers/hdfs/src/main/java/com/criteo/hadoop/garmadon/hdfs/ReaderFactory.java @@ -71,6 +71,8 @@ public class ReaderFactory { EventsWithHeader.SparkRddStorageStatus.class, SparkEventProtos.RDDStorageStatus.newBuilder()); addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_EXECUTOR_STORAGE_STATUS_EVENT, "spark_executor_storage_status", EventsWithHeader.SparkExecutorStorageStatus.class, SparkEventProtos.ExecutorStorageStatus.newBuilder()); + addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_APPLICATION_STATE_EVENT, "spark_application_state", + EventsWithHeader.SparkApplicationStateEvent.class, SparkEventProtos.ApplicationStateEvent.newBuilder()); addTypeMapping(out, GarmadonSerialization.TypeMarker.APPLICATION_EVENT, "application_event", EventsWithHeader.ApplicationEvent.class, ResourceManagerEventProtos.ApplicationEvent.newBuilder()); addTypeMapping(out, GarmadonSerialization.TypeMarker.CONTAINER_EVENT, "container_event", diff --git a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java index 4d900d77..db888fd9 100644 --- a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java +++ b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java @@ -30,6 +30,7 @@ public interface TypeMarker { int SPARK_TASK_EVENT = 3003; int SPARK_EXECUTOR_STORAGE_STATUS_EVENT = 3004; int SPARK_RDD_STORAGE_STATUS_EVENT = 3005; + int SPARK_APPLICATION_STATE_EVENT = 3006; int APPLICATION_EVENT = 4000; int CONTAINER_EVENT = 4001; int FLINK_JOB_MANAGER_EVENT = 5000; @@ -79,6 +80,8 @@ public interface TypeMarker { SparkEventProtos.ExecutorStorageStatus::toByteArray, SparkEventProtos.ExecutorStorageStatus::parseFrom); register(SparkEventProtos.RDDStorageStatus.class, TypeMarker.SPARK_RDD_STORAGE_STATUS_EVENT, "SPARK_RDD_STORAGE_STATUS_EVENT", SparkEventProtos.RDDStorageStatus::toByteArray, SparkEventProtos.RDDStorageStatus::parseFrom); + register(SparkEventProtos.ApplicationStateEvent.class, TypeMarker.SPARK_APPLICATION_STATE_EVENT, "SPARK_APPLICATION_STATE_EVENT", + SparkEventProtos.ApplicationStateEvent::toByteArray, SparkEventProtos.ApplicationStateEvent::parseFrom); // Flink events register(FlinkEventProtos.JobManagerEvent.class, TypeMarker.FLINK_JOB_MANAGER_EVENT, "FLINK_JOB_MANAGER_EVENT", diff --git a/schema/src/main/protobuf/spark_event.proto b/schema/src/main/protobuf/spark_event.proto index a630eb03..f716bb81 100644 --- a/schema/src/main/protobuf/spark_event.proto +++ b/schema/src/main/protobuf/spark_event.proto @@ -135,3 +135,19 @@ message RDDStorageStatus { int64 memory_used = 3; int64 disk_used = 4; } + +message ApplicationStateEvent { + string state = 1; + string deploy_mode = 2; + int32 driver_memory_mb = 3; + int32 driver_memory_overhead_mb = 4; + int32 driver_cores = 5; + int32 executor_memory_mb = 6; + int32 executor_memory_overhead_mb = 7; + int32 executor_cores = 8; + int32 executor_instances = 9; + bool dynamic_allocation_enabled = 10; + int32 dynamic_allocation_min_executors = 11; + int32 dynamic_allocation_max_executors = 12; + int32 dynamic_allocation_initial_executors = 13; +}