Skip to content

Commit

Permalink
Register ApplicationStateEvent in GarmadonSerialization, revert using…
Browse files Browse the repository at this point in the history
… SparkSession

SparkSession is unavailable at this stage
  • Loading branch information
aroville-criteo committed Sep 20, 2024
1 parent de36d40 commit 7007242
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.criteo.hadoop.garmadon.schema.events.Header;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.*;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,13 +79,7 @@ private void sendExecutorStateEvent(long time, State state, String executorId, S
}

private void sendApplicationStateEvent(long time, State state) {
if (!SparkSession.getActiveSession().isDefined()) {
LOGGER.warn("Unable to retrieve a SparkSession, not recording application event.");
return;
}

SparkSession session = SparkSession.getActiveSession().get();
SparkConf conf = session.sparkContext().conf();
SparkConf conf = new SparkConf();
String deployMode = conf.get("spark.submit.deployMode", "unknown");

int driverMemoryMb = (int) conf.getSizeAsMb("spark.driver.memory", "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface TypeMarker {
SparkEventProtos.ExecutorStorageStatus::toByteArray, SparkEventProtos.ExecutorStorageStatus::parseFrom);
register(SparkEventProtos.RDDStorageStatus.class, TypeMarker.SPARK_RDD_STORAGE_STATUS_EVENT, "SPARK_RDD_STORAGE_STATUS_EVENT",
SparkEventProtos.RDDStorageStatus::toByteArray, SparkEventProtos.RDDStorageStatus::parseFrom);
register(SparkEventProtos.ApplicationStateEvent.class, TypeMarker.SPARK_APPLICATION_STATE_EVENT, "SPARK_APPLICATION_STATE_EVENT",
SparkEventProtos.ApplicationStateEvent::toByteArray, SparkEventProtos.ApplicationStateEvent::parseFrom);

// Flink events
register(FlinkEventProtos.JobManagerEvent.class, TypeMarker.FLINK_JOB_MANAGER_EVENT, "FLINK_JOB_MANAGER_EVENT",
Expand Down

0 comments on commit 7007242

Please sign in to comment.