Skip to content

Commit

Permalink
Send a subset of Spark properties on application start and end
Browse files Browse the repository at this point in the history
Log basic driver and executor properties.
Sending an event when the application starts and when it ends will
allow to catch configs that may have changed during execution, and
differ from those sent in the spark-submit.
  • Loading branch information
aroville-criteo committed Oct 1, 2024
1 parent 7229ac6 commit ffec359
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.criteo.hadoop.garmadon.event.proto.SparkEventProtos;
import com.criteo.hadoop.garmadon.schema.enums.State;
import com.criteo.hadoop.garmadon.schema.events.Header;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,6 +78,45 @@ private void sendExecutorStateEvent(long time, State state, String executorId, S
this.eventHandler.accept(time, buildOverrideHeader(executorId), executorStateEvent.build());
}

private void sendApplicationStateEvent(long time, State state) {
SparkConf conf = new SparkConf();
String deployMode = conf.get("spark.submit.deployMode", "unknown");

int driverMemoryMb = (int) conf.getSizeAsMb("spark.driver.memory", "0");
int driverMemoryOverheadMb = (int) conf.getSizeAsMb("spark.driver.memoryOverhead", "0");
int driverCores = conf.getInt("spark.driver.cores", 0);

int executorMemoryMb = (int) conf.getSizeAsMb("spark.executor.memory", "0");
int executorMemoryOverheadMb = (int) conf.getSizeAsMb("spark.executor.memoryOverhead", "0");
float executorMemoryOverheadFactor = (float) conf.getDouble("spark.executor.memoryOverheadFactor", 0.0);
int executorCores = conf.getInt("spark.executor.cores", 0);

int executorInstances = conf.getInt("spark.executor.instances", 0);
boolean dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false);
int dynamicAllocationMinExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0);
int dynamicAllocationMaxExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 0);
int dynamicAllocationInitialExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", 0);

SparkEventProtos.ApplicationStateEvent.Builder applicationStateEvent = SparkEventProtos.ApplicationStateEvent
.newBuilder()
.setState(state.name())
.setDeployMode(deployMode)
.setDriverMemoryMb(driverMemoryMb)
.setDriverMemoryOverheadMb(driverMemoryOverheadMb)
.setDriverCores(driverCores)
.setExecutorMemoryMb(executorMemoryMb)
.setExecutorMemoryOverheadMb(executorMemoryOverheadMb)
.setExecutorMemoryOverheadFactor(executorMemoryOverheadFactor)
.setExecutorCores(executorCores)
.setExecutorInstances(executorInstances)
.setDynamicAllocationEnabled(dynamicAllocationEnabled)
.setDynamicAllocationMinExecutors(dynamicAllocationMinExecutors)
.setDynamicAllocationMaxExecutors(dynamicAllocationMaxExecutors)
.setDynamicAllocationInitialExecutors(dynamicAllocationInitialExecutors);

this.eventHandler.accept(time, header, applicationStateEvent.build());
}

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try {
Expand All @@ -86,11 +126,26 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
.withApplicationName(applicationStart.appName())
.build())
.toSerializeHeader();
} catch (Throwable t) {
LOGGER.warn("Failed to initialize header on application startup", t);
}

try {
sendApplicationStateEvent(applicationStart.time(), State.BEGIN);
} catch (Throwable t) {
LOGGER.warn("Failed to send event for onApplicationStart", t);
}
}

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
try {
sendApplicationStateEvent(applicationEnd.time(), State.END);
} catch (Throwable t) {
LOGGER.warn("Failed to send event for onApplicationEnd", t);
}
}

// Stage Events
@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public static Descriptors.Descriptor getDescriptor() throws Descriptors.Descript
}
}

public static abstract class SparkApplicationStateEvent implements Message {
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(SparkEventProtos.ApplicationStateEvent.getDescriptor());
}
}

public static abstract class ApplicationEvent implements Message {
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(ResourceManagerEventProtos.ApplicationEvent.getDescriptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class ReaderFactory {
EventsWithHeader.SparkRddStorageStatus.class, SparkEventProtos.RDDStorageStatus.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_EXECUTOR_STORAGE_STATUS_EVENT, "spark_executor_storage_status",
EventsWithHeader.SparkExecutorStorageStatus.class, SparkEventProtos.ExecutorStorageStatus.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_APPLICATION_STATE_EVENT, "spark_application_state",
EventsWithHeader.SparkApplicationStateEvent.class, SparkEventProtos.ApplicationStateEvent.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.APPLICATION_EVENT, "application_event",
EventsWithHeader.ApplicationEvent.class, ResourceManagerEventProtos.ApplicationEvent.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.CONTAINER_EVENT, "container_event",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface TypeMarker {
int SPARK_TASK_EVENT = 3003;
int SPARK_EXECUTOR_STORAGE_STATUS_EVENT = 3004;
int SPARK_RDD_STORAGE_STATUS_EVENT = 3005;
int SPARK_APPLICATION_STATE_EVENT = 3006;
int APPLICATION_EVENT = 4000;
int CONTAINER_EVENT = 4001;
int FLINK_JOB_MANAGER_EVENT = 5000;
Expand Down Expand Up @@ -79,6 +80,8 @@ public interface TypeMarker {
SparkEventProtos.ExecutorStorageStatus::toByteArray, SparkEventProtos.ExecutorStorageStatus::parseFrom);
register(SparkEventProtos.RDDStorageStatus.class, TypeMarker.SPARK_RDD_STORAGE_STATUS_EVENT, "SPARK_RDD_STORAGE_STATUS_EVENT",
SparkEventProtos.RDDStorageStatus::toByteArray, SparkEventProtos.RDDStorageStatus::parseFrom);
register(SparkEventProtos.ApplicationStateEvent.class, TypeMarker.SPARK_APPLICATION_STATE_EVENT, "SPARK_APPLICATION_STATE_EVENT",
SparkEventProtos.ApplicationStateEvent::toByteArray, SparkEventProtos.ApplicationStateEvent::parseFrom);

// Flink events
register(FlinkEventProtos.JobManagerEvent.class, TypeMarker.FLINK_JOB_MANAGER_EVENT, "FLINK_JOB_MANAGER_EVENT",
Expand Down
17 changes: 17 additions & 0 deletions schema/src/main/protobuf/spark_event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,20 @@ message RDDStorageStatus {
int64 memory_used = 3;
int64 disk_used = 4;
}

message ApplicationStateEvent {
string state = 1;
string deploy_mode = 2;
int32 driver_memory_mb = 3;
int32 driver_memory_overhead_mb = 4;
int32 driver_cores = 5;
int32 executor_memory_mb = 6;
int32 executor_memory_overhead_mb = 7;
float executor_memory_overhead_factor = 8;
int32 executor_cores = 9;
int32 executor_instances = 10;
bool dynamic_allocation_enabled = 11;
int32 dynamic_allocation_min_executors = 12;
int32 dynamic_allocation_max_executors = 13;
int32 dynamic_allocation_initial_executors = 14;
}

0 comments on commit ffec359

Please sign in to comment.