in wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java [116:173]
public SparkContextReference getSparkContext(Job job) {
//System.out.println("In Job with "+job.isMontiorWithHackIT());
//System.exit(0);
// NB: There must be only one JavaSparkContext per JVM. Therefore, it is not local to the executor.
final SparkConf sparkConf;
final Configuration configuration = job.getConfiguration();
if (this.sparkContextReference != null && !this.sparkContextReference.isDisposed()) {
final JavaSparkContext sparkContext = this.sparkContextReference.get();
this.logger.warn(
"There is already a SparkContext (master: {}): , which will be reused. " +
"Not all settings might be effective.", sparkContext.getConf().get("spark.master"));
sparkConf = sparkContext.getConf();
} else {
sparkConf = new SparkConf(true);
}
for (String property : REQUIRED_SPARK_PROPERTIES) {
sparkConf.set(property, configuration.getStringProperty(property));
}
for (String property : OPTIONAL_SPARK_PROPERTIES) {
configuration.getOptionalStringProperty(property).ifPresent(
value -> sparkConf.set(property, value)
);
}
if (job.getName() != null) {
sparkConf.set("spark.app.name", job.getName());
}
// sparkConf.set("spark.extraListeners","org.apache.wayang.monitoring.spark.SparkListener");
if (this.sparkContextReference == null || this.sparkContextReference.isDisposed()) {
this.sparkContextReference = new SparkContextReference(job.getCrossPlatformExecutor(), new JavaSparkContext(sparkConf));
}
final JavaSparkContext sparkContext = this.sparkContextReference.get();
//SparkContext sc= sparkContext.sc();
if(job.isMontiorWithHackIT()) {
sparkConf.set("spark.extraListeners","org.apache.wayang.spark.monitoring.spark_monitoring.SparkListener");
sparkContext.sc().addSparkListener(new SparkListener());
}
if (!sparkContext.isLocal()) {
// Add Wayang JAR files.
this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(SparkPlatform.class)); // wayang-spark
this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(WayangBasic.class)); // wayang-basic
this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(WayangContext.class)); // wayang-core
final Set<String> udfJarPaths = job.getUdfJarPaths();
if (udfJarPaths.isEmpty()) {
this.logger.warn("Non-local SparkContext but not UDF JARs have been declared.");
} else {
udfJarPaths.forEach(this::registerJarIfNotNull);
}
}
return this.sparkContextReference;
}