public SparkContextReference getSparkContext()

in wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java [116:173]


    public SparkContextReference getSparkContext(Job job) {
        //System.out.println("In Job with "+job.isMontiorWithHackIT());
        //System.exit(0);
        // NB: There must be only one JavaSparkContext per JVM. Therefore, it is not local to the executor.
        final SparkConf sparkConf;
        final Configuration configuration = job.getConfiguration();

        if (this.sparkContextReference != null && !this.sparkContextReference.isDisposed()) {
            final JavaSparkContext sparkContext = this.sparkContextReference.get();
            this.logger.warn(
                    "There is already a SparkContext (master: {}): , which will be reused. " +
                            "Not all settings might be effective.", sparkContext.getConf().get("spark.master"));
            sparkConf = sparkContext.getConf();



        } else {
            sparkConf = new SparkConf(true);
        }

        for (String property : REQUIRED_SPARK_PROPERTIES) {
            sparkConf.set(property, configuration.getStringProperty(property));
        }
        for (String property : OPTIONAL_SPARK_PROPERTIES) {
            configuration.getOptionalStringProperty(property).ifPresent(
                    value -> sparkConf.set(property, value)
            );
        }

        if (job.getName() != null) {
            sparkConf.set("spark.app.name", job.getName());
        }
        // sparkConf.set("spark.extraListeners","org.apache.wayang.monitoring.spark.SparkListener");
        if (this.sparkContextReference == null || this.sparkContextReference.isDisposed()) {
            this.sparkContextReference = new SparkContextReference(job.getCrossPlatformExecutor(), new JavaSparkContext(sparkConf));
        }
        final JavaSparkContext sparkContext = this.sparkContextReference.get();

        //SparkContext sc= sparkContext.sc();
        if(job.isMontiorWithHackIT()) {
            sparkConf.set("spark.extraListeners","org.apache.wayang.spark.monitoring.spark_monitoring.SparkListener");
            sparkContext.sc().addSparkListener(new SparkListener());
        }
        if (!sparkContext.isLocal()) {
            // Add Wayang JAR files.
            this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(SparkPlatform.class)); // wayang-spark
            this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(WayangBasic.class)); // wayang-basic
            this.registerJarIfNotNull(ReflectionUtils.getDeclaringJar(WayangContext.class)); // wayang-core
            final Set<String> udfJarPaths = job.getUdfJarPaths();
            if (udfJarPaths.isEmpty()) {
                this.logger.warn("Non-local SparkContext but not UDF JARs have been declared.");
            } else {
                udfJarPaths.forEach(this::registerJarIfNotNull);
            }
        }

        return this.sparkContextReference;
    }