Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send a subset of Spark properties on application start and end #230

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.criteo.hadoop.garmadon.event.proto.SparkEventProtos;
import com.criteo.hadoop.garmadon.schema.enums.State;
import com.criteo.hadoop.garmadon.schema.events.Header;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,6 +78,45 @@ private void sendExecutorStateEvent(long time, State state, String executorId, S
this.eventHandler.accept(time, buildOverrideHeader(executorId), executorStateEvent.build());
}

private void sendApplicationStateEvent(long time, State state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would give a try using the constants in core/src/main/scala/org/apache/spark/internal/config/package.scala to access the conf values because they can provide the default values as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those constants are private, I cannot reference them from the listener

SparkConf conf = new SparkConf();
String deployMode = conf.get("spark.submit.deployMode", "unknown");

int driverMemoryMb = (int) conf.getSizeAsMb("spark.driver.memory", "0");
int driverMemoryOverheadMb = (int) conf.getSizeAsMb("spark.driver.memoryOverhead", "0");
int driverCores = conf.getInt("spark.driver.cores", 0);

int executorMemoryMb = (int) conf.getSizeAsMb("spark.executor.memory", "0");
int executorMemoryOverheadMb = (int) conf.getSizeAsMb("spark.executor.memoryOverhead", "0");
aroville-criteo marked this conversation as resolved.
Show resolved Hide resolved
float executorMemoryOverheadFactor = (float) conf.getDouble("spark.executor.memoryOverheadFactor", 0.0);
int executorCores = conf.getInt("spark.executor.cores", 0);

int executorInstances = conf.getInt("spark.executor.instances", 0);
boolean dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false);
int dynamicAllocationMinExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0);
int dynamicAllocationMaxExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 0);
int dynamicAllocationInitialExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", 0);

SparkEventProtos.ApplicationStateEvent.Builder applicationStateEvent = SparkEventProtos.ApplicationStateEvent
.newBuilder()
.setState(state.name())
.setDeployMode(deployMode)
.setDriverMemoryMb(driverMemoryMb)
.setDriverMemoryOverheadMb(driverMemoryOverheadMb)
.setDriverCores(driverCores)
.setExecutorMemoryMb(executorMemoryMb)
.setExecutorMemoryOverheadMb(executorMemoryOverheadMb)
.setExecutorMemoryOverheadFactor(executorMemoryOverheadFactor)
.setExecutorCores(executorCores)
.setExecutorInstances(executorInstances)
.setDynamicAllocationEnabled(dynamicAllocationEnabled)
.setDynamicAllocationMinExecutors(dynamicAllocationMinExecutors)
.setDynamicAllocationMaxExecutors(dynamicAllocationMaxExecutors)
.setDynamicAllocationInitialExecutors(dynamicAllocationInitialExecutors);

this.eventHandler.accept(time, header, applicationStateEvent.build());
}

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try {
Expand All @@ -86,11 +126,26 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
.withApplicationName(applicationStart.appName())
.build())
.toSerializeHeader();
} catch (Throwable t) {
LOGGER.warn("Failed to initialize header on application startup", t);
}

try {
sendApplicationStateEvent(applicationStart.time(), State.BEGIN);
} catch (Throwable t) {
LOGGER.warn("Failed to send event for onApplicationStart", t);
}
}

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
try {
sendApplicationStateEvent(applicationEnd.time(), State.END);
} catch (Throwable t) {
LOGGER.warn("Failed to send event for onApplicationEnd", t);
}
}

// Stage Events
@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public static Descriptors.Descriptor getDescriptor() throws Descriptors.Descript
}
}

public static abstract class SparkApplicationStateEvent implements Message {
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(SparkEventProtos.ApplicationStateEvent.getDescriptor());
}
}

public static abstract class ApplicationEvent implements Message {
public static Descriptors.Descriptor getDescriptor() throws Descriptors.DescriptorValidationException {
return descriptorForTypeWithHeader(ResourceManagerEventProtos.ApplicationEvent.getDescriptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class ReaderFactory {
EventsWithHeader.SparkRddStorageStatus.class, SparkEventProtos.RDDStorageStatus.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_EXECUTOR_STORAGE_STATUS_EVENT, "spark_executor_storage_status",
EventsWithHeader.SparkExecutorStorageStatus.class, SparkEventProtos.ExecutorStorageStatus.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.SPARK_APPLICATION_STATE_EVENT, "spark_application_state",
EventsWithHeader.SparkApplicationStateEvent.class, SparkEventProtos.ApplicationStateEvent.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.APPLICATION_EVENT, "application_event",
EventsWithHeader.ApplicationEvent.class, ResourceManagerEventProtos.ApplicationEvent.newBuilder());
addTypeMapping(out, GarmadonSerialization.TypeMarker.CONTAINER_EVENT, "container_event",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface TypeMarker {
int SPARK_TASK_EVENT = 3003;
int SPARK_EXECUTOR_STORAGE_STATUS_EVENT = 3004;
int SPARK_RDD_STORAGE_STATUS_EVENT = 3005;
int SPARK_APPLICATION_STATE_EVENT = 3006;
int APPLICATION_EVENT = 4000;
int CONTAINER_EVENT = 4001;
int FLINK_JOB_MANAGER_EVENT = 5000;
Expand Down Expand Up @@ -79,6 +80,8 @@ public interface TypeMarker {
SparkEventProtos.ExecutorStorageStatus::toByteArray, SparkEventProtos.ExecutorStorageStatus::parseFrom);
register(SparkEventProtos.RDDStorageStatus.class, TypeMarker.SPARK_RDD_STORAGE_STATUS_EVENT, "SPARK_RDD_STORAGE_STATUS_EVENT",
SparkEventProtos.RDDStorageStatus::toByteArray, SparkEventProtos.RDDStorageStatus::parseFrom);
register(SparkEventProtos.ApplicationStateEvent.class, TypeMarker.SPARK_APPLICATION_STATE_EVENT, "SPARK_APPLICATION_STATE_EVENT",
SparkEventProtos.ApplicationStateEvent::toByteArray, SparkEventProtos.ApplicationStateEvent::parseFrom);

// Flink events
register(FlinkEventProtos.JobManagerEvent.class, TypeMarker.FLINK_JOB_MANAGER_EVENT, "FLINK_JOB_MANAGER_EVENT",
Expand Down
17 changes: 17 additions & 0 deletions schema/src/main/protobuf/spark_event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,20 @@ message RDDStorageStatus {
int64 memory_used = 3;
int64 disk_used = 4;
}

message ApplicationStateEvent {
string state = 1;
string deploy_mode = 2;
int32 driver_memory_mb = 3;
int32 driver_memory_overhead_mb = 4;
int32 driver_cores = 5;
int32 executor_memory_mb = 6;
int32 executor_memory_overhead_mb = 7;
float executor_memory_overhead_factor = 8;
int32 executor_cores = 9;
int32 executor_instances = 10;
bool dynamic_allocation_enabled = 11;
int32 dynamic_allocation_min_executors = 12;
int32 dynamic_allocation_max_executors = 13;
int32 dynamic_allocation_initial_executors = 14;
}