private Future submitWithExecutor()

in spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java [225:521]


    private Future<ComputerResult> submitWithExecutor(Executor exec) {
        // create the completable future
        final Future<ComputerResult> result = computerService.submit(() -> {
            final long startTime = System.currentTimeMillis();
            //////////////////////////////////////////////////
            /////// PROCESS SHIM AND SYSTEM PROPERTIES ///////
            //////////////////////////////////////////////////
            ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
            final String shimService = KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
                    UnshadedKryoShimService.class.getCanonicalName() :
                    HadoopPoolShimService.class.getCanonicalName();
            this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
            ///////////
            final StringBuilder params = new StringBuilder();
            this.sparkConfiguration.getKeys().forEachRemaining(key -> {
                if (KEYS_PASSED_IN_JVM_SYSTEM_PROPERTIES.contains(key)) {
                    params.append(" -D").append("tinkerpop.").append(key).append("=").append(this.sparkConfiguration.getProperty(key));
                    System.setProperty("tinkerpop." + key, this.sparkConfiguration.getProperty(key).toString());
                }
            });
            if (params.length() > 0) {
                this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
                        (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
                this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
                        (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
            }
            KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration);
            //////////////////////////////////////////////////
            //////////////////////////////////////////////////
            //////////////////////////////////////////////////
            // apache and hadoop configurations that are used throughout the graph computer computation
            final org.apache.commons.configuration2.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
            if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
                graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
                if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
                    graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
            }
            graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
            final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
            final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
            final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
            final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
            final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
            final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
            final boolean skipPartitioner = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_PARTITIONER, false);
            final boolean skipPersist = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
            if (inputFromHDFS) {
                String inputLocation = Constants
                        .getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
                                fileSystemStorage).orElse(null);
                if (null != inputLocation) {
                    try {
                        graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
                                FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath()
                                        .toString());
                        hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
                                FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath()
                                        .toString());
                    } catch (final IOException e) {
                        throw new IllegalStateException(e.getMessage(), e);
                    }
                }
            }
            final InputRDD inputRDD = SparkIOUtil.createInputRDD(hadoopConfiguration);
            final boolean filtered;
            // if the input class can filter on load, then set the filters
            if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
                GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
                filtered = false;
            } else if (inputRDD instanceof GraphFilterAware) {
                ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
                filtered = false;
            } else if (this.graphFilter.hasFilter()) {
                filtered = true;
            } else {
                filtered = false;
            }

            final OutputRDD outputRDD;
            try {
                outputRDD = OutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)) ?
                        hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, OutputRDD.class, OutputRDD.class).newInstance() :
                        OutputFormatRDD.class.newInstance();
            } catch (final InstantiationException | IllegalAccessException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }

            // create the spark context from the graph computer configuration
            final JavaSparkContext sparkContext = new JavaSparkContext(Spark.create(hadoopConfiguration));
            final Storage sparkContextStorage = SparkContextStorage.open();

            SparkMemory memory = null;
            // delete output location
            final boolean dontDeleteNonEmptyOutput =
                    graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, false);
            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
            if (null != outputLocation) {
                if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
                    if (dontDeleteNonEmptyOutput) {
                        // DON'T delete the content if the folder is not empty
                        if (fileSystemStorage.ls(outputLocation).size() == 0) {
                            fileSystemStorage.rm(outputLocation);
                        } else {
                            throw new IllegalStateException("The output location '" + outputLocation + "' is not empty");
                        }
                    } else {
                        fileSystemStorage.rm(outputLocation);
                    }
                }
                if (outputToSpark && sparkContextStorage.exists(outputLocation))
                    sparkContextStorage.rm(outputLocation);
            }

            // the Spark application name will always be set by SparkContextStorage, thus, INFO the name to make it easier to debug
            logger.debug(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");

            // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
            try {
                this.loadJars(hadoopConfiguration, sparkContext); // add the project jars to the cluster
                updateLocalConfiguration(sparkContext, hadoopConfiguration);
                // create a message-passing friendly rdd from the input rdd
                boolean partitioned = false;
                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = SparkIOUtil.loadVertices(inputRDD, graphComputerConfiguration, sparkContext);
                // if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
                if (filtered) {
                    this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
                    loadedGraphRDD = SparkExecutor.applyGraphFilter(loadedGraphRDD, this.graphFilter);
                }
                // if the loaded graph RDD is already partitioned use that partitioner, else partition it with HashPartitioner
                if (loadedGraphRDD.partitioner().isPresent())
                    this.logger.debug("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
                else {
                    if (!skipPartitioner) {
                        final Partitioner partitioner = new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size());
                        this.logger.debug("Partitioning the loaded graphRDD: " + partitioner);
                        loadedGraphRDD = loadedGraphRDD.partitionBy(partitioner);
                        partitioned = true;
                        assert loadedGraphRDD.partitioner().isPresent();
                    } else {
                        assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent(); // no easy way to test this with a test case
                        this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + GREMLIN_SPARK_SKIP_PARTITIONER);
                    }
                }
                // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
                if (this.workersSet) {
                    if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the loaded graphRDD does not have more partitions than workers
                        loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
                    else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the loaded graphRDD does not have less partitions than workers
                        loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
                }
                // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
                if (!skipPersist && (!inputFromSpark || partitioned || filtered))
                    loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));

                // final graph with view (for persisting and/or mapReducing -- may be null and thus, possible to save space/time)
                JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
                ////////////////////////////////
                // process the vertex program //
                ////////////////////////////////
                if (null != this.vertexProgram) {
                    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                    // build a shortcut (which reduces the total Spark stages from 3 to 2) for CloneVertexProgram since it does nothing
                    // and this improves the overall performance a lot
                    if (this.vertexProgram.getClass().equals(CloneVertexProgram.class) &&
                        !graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
                        graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkCloneVertexProgramInterceptor.class.getName());
                    }
                    /////////////////
                    // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
                    if (graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
                        try {
                            final SparkVertexProgramInterceptor<VertexProgram> interceptor =
                                    (SparkVertexProgramInterceptor) Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
                            computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
                        } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                            throw new IllegalStateException(e.getMessage());
                        }
                    } else {  // standard GraphComputer semantics
                        // get a configuration that will be propagated to all workers
                        final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
                        this.vertexProgram.storeState(vertexProgramConfiguration);
                        // set up the vertex program and wire up configurations
                        this.vertexProgram.setup(memory);
                        JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
                        memory.broadcastMemory(sparkContext);
                        // execute the vertex program
                        while (true) {
                            if (Thread.interrupted()) {
                                sparkContext.cancelAllJobs();
                                throw new TraversalInterruptedException();
                            }
                            memory.setInExecute(true);
                            viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, graphComputerConfiguration, vertexProgramConfiguration);
                            memory.setInExecute(false);
                            if (this.vertexProgram.terminate(memory))
                                break;
                            else {
                                memory.incrIteration();
                                memory.broadcastMemory(sparkContext);
                            }
                        }
                        // if the graph will be continued to be used (persisted or mapreduced), then generate a view+graph
                        if ((null != outputRDD && !this.persist.equals(Persist.NOTHING)) || !this.mapReducers.isEmpty()) {
                            computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
                            assert null != computedGraphRDD && computedGraphRDD != loadedGraphRDD;
                        } else {
                            // ensure that the computedGraphRDD was not created
                            assert null == computedGraphRDD;
                        }
                    }
                    /////////////////
                    memory.complete(); // drop all transient memory keys
                    // write the computed graph to the respective output (rdd or output format)
                    if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
                        assert null != computedGraphRDD; // the logic holds that a computeGraphRDD must be created at this point
                        outputRDD.writeGraphRDD(graphComputerConfiguration, computedGraphRDD);
                    }
                }

                final boolean computedGraphCreated = computedGraphRDD != null && computedGraphRDD != loadedGraphRDD;
                if (!computedGraphCreated)
                    computedGraphRDD = loadedGraphRDD;

                final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);

                //////////////////////////////
                // process the map reducers //
                //////////////////////////////
                if (!this.mapReducers.isEmpty()) {
                    // create a mapReduceRDD for executing the map reduce jobs on
                    JavaPairRDD<Object, VertexWritable> mapReduceRDD = computedGraphRDD;
                    if (computedGraphCreated && !outputToSpark) {
                        // drop all the edges of the graph as they are not used in mapReduce processing
                        mapReduceRDD = computedGraphRDD.mapValues(vertexWritable -> {
                            vertexWritable.get().dropEdges(Direction.BOTH);
                            return vertexWritable;
                        });
                        // if there is only one MapReduce to execute, don't bother wasting the clock cycles.
                        if (this.mapReducers.size() > 1)
                            mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                    }

                    for (final MapReduce mapReduce : this.mapReducers) {
                        // execute the map reduce job
                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(graphComputerConfiguration);
                        mapReduce.storeState(newApacheConfiguration);
                        // map
                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, newApacheConfiguration);
                        // combine
                        final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
                        // reduce
                        final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
                        // write the map reduce output back to disk and computer result memory
                        if (null != outputRDD)
                            mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(graphComputerConfiguration, mapReduce.getMemoryKey(), reduceRDD));
                    }
                    // if the mapReduceRDD is not simply the computed graph, unpersist the mapReduceRDD
                    if (computedGraphCreated && !outputToSpark) {
                        assert loadedGraphRDD != computedGraphRDD;
                        assert mapReduceRDD != computedGraphRDD;
                        mapReduceRDD.unpersist();
                    } else {
                        assert mapReduceRDD == computedGraphRDD;
                    }
                }

                // unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
                // if the graphRDD was loaded from Spark, but then partitioned or filtered, its a different RDD
                if (!inputFromSpark || partitioned || filtered)
                    loadedGraphRDD.unpersist();
                // unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
                // if the computed graph is the loadedGraphRDD because it was not mutated and not-unpersisted, then don't unpersist the computedGraphRDD/loadedGraphRDD
                if ((!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING)) && computedGraphCreated)
                    computedGraphRDD.unpersist();
                // delete any file system or rdd data if persist nothing
                if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
                    if (outputToHDFS)
                        fileSystemStorage.rm(outputLocation);
                    if (outputToSpark)
                        sparkContextStorage.rm(outputLocation);
                }
                // update runtime and return the newly computed graph
                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                // clear properties that should not be propagated in an OLAP chain
                graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
                graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
                graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_GRAPH_CACHE);
                graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_PARTITIONER);
                return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
            } finally {
                if (!graphComputerConfiguration.getBoolean(GREMLIN_SPARK_PERSIST_CONTEXT, false))
                    Spark.close();
            }
        });
        computerService.shutdown();
        return result;
    }