From 7007242b6d539c6376d15e60d66d864aee716b0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Axel=20Rovill=C3=A9?= Date: Fri, 20 Sep 2024 10:38:20 +0200 Subject: [PATCH] Register ApplicationStateEvent in GarmadonSerialization, revert using SparkSession SparkSession is unavailable at this stage --- .../garmadon/spark/listener/GarmadonSparkListener.java | 9 +-------- .../schema/serialization/GarmadonSerialization.java | 2 ++ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java b/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java index 5f3e65cb..b6240757 100644 --- a/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java +++ b/frameworks/spark/src/main/java/com/criteo/hadoop/garmadon/spark/listener/GarmadonSparkListener.java @@ -6,7 +6,6 @@ import com.criteo.hadoop.garmadon.schema.events.Header; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.*; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,13 +79,7 @@ private void sendExecutorStateEvent(long time, State state, String executorId, S } private void sendApplicationStateEvent(long time, State state) { - if (!SparkSession.getActiveSession().isDefined()) { - LOGGER.warn("Unable to retrieve a SparkSession, not recording application event."); - return; - } - - SparkSession session = SparkSession.getActiveSession().get(); - SparkConf conf = session.sparkContext().conf(); + SparkConf conf = new SparkConf(); String deployMode = conf.get("spark.submit.deployMode", "unknown"); int driverMemoryMb = (int) conf.getSizeAsMb("spark.driver.memory", "0"); diff --git a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java index 917370e1..db888fd9 100644 --- a/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java +++ b/schema/src/main/java/com/criteo/hadoop/garmadon/schema/serialization/GarmadonSerialization.java @@ -80,6 +80,8 @@ public interface TypeMarker { SparkEventProtos.ExecutorStorageStatus::toByteArray, SparkEventProtos.ExecutorStorageStatus::parseFrom); register(SparkEventProtos.RDDStorageStatus.class, TypeMarker.SPARK_RDD_STORAGE_STATUS_EVENT, "SPARK_RDD_STORAGE_STATUS_EVENT", SparkEventProtos.RDDStorageStatus::toByteArray, SparkEventProtos.RDDStorageStatus::parseFrom); + register(SparkEventProtos.ApplicationStateEvent.class, TypeMarker.SPARK_APPLICATION_STATE_EVENT, "SPARK_APPLICATION_STATE_EVENT", + SparkEventProtos.ApplicationStateEvent::toByteArray, SparkEventProtos.ApplicationStateEvent::parseFrom); // Flink events register(FlinkEventProtos.JobManagerEvent.class, TypeMarker.FLINK_JOB_MANAGER_EVENT, "FLINK_JOB_MANAGER_EVENT",