in spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java [225:521]
private Future<ComputerResult> submitWithExecutor(Executor exec) {
// create the completable future
final Future<ComputerResult> result = computerService.submit(() -> {
final long startTime = System.currentTimeMillis();
//////////////////////////////////////////////////
/////// PROCESS SHIM AND SYSTEM PROPERTIES ///////
//////////////////////////////////////////////////
ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
final String shimService = KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
UnshadedKryoShimService.class.getCanonicalName() :
HadoopPoolShimService.class.getCanonicalName();
this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
///////////
final StringBuilder params = new StringBuilder();
this.sparkConfiguration.getKeys().forEachRemaining(key -> {
if (KEYS_PASSED_IN_JVM_SYSTEM_PROPERTIES.contains(key)) {
params.append(" -D").append("tinkerpop.").append(key).append("=").append(this.sparkConfiguration.getProperty(key));
System.setProperty("tinkerpop." + key, this.sparkConfiguration.getProperty(key).toString());
}
});
if (params.length() > 0) {
this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
(this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
(this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
}
KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration);
//////////////////////////////////////////////////
//////////////////////////////////////////////////
//////////////////////////////////////////////////
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration2.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
}
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
final boolean skipPartitioner = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_PARTITIONER, false);
final boolean skipPersist = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
if (inputFromHDFS) {
String inputLocation = Constants
.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
fileSystemStorage).orElse(null);
if (null != inputLocation) {
try {
graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath()
.toString());
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath()
.toString());
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
final InputRDD inputRDD = SparkIOUtil.createInputRDD(hadoopConfiguration);
final boolean filtered;
// if the input class can filter on load, then set the filters
if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
filtered = false;
} else if (inputRDD instanceof GraphFilterAware) {
((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
filtered = false;
} else if (this.graphFilter.hasFilter()) {
filtered = true;
} else {
filtered = false;
}
final OutputRDD outputRDD;
try {
outputRDD = OutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)) ?
hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, OutputRDD.class, OutputRDD.class).newInstance() :
OutputFormatRDD.class.newInstance();
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// create the spark context from the graph computer configuration
final JavaSparkContext sparkContext = new JavaSparkContext(Spark.create(hadoopConfiguration));
final Storage sparkContextStorage = SparkContextStorage.open();
SparkMemory memory = null;
// delete output location
final boolean dontDeleteNonEmptyOutput =
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, false);
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
if (null != outputLocation) {
if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
if (dontDeleteNonEmptyOutput) {
// DON'T delete the content if the folder is not empty
if (fileSystemStorage.ls(outputLocation).size() == 0) {
fileSystemStorage.rm(outputLocation);
} else {
throw new IllegalStateException("The output location '" + outputLocation + "' is not empty");
}
} else {
fileSystemStorage.rm(outputLocation);
}
}
if (outputToSpark && sparkContextStorage.exists(outputLocation))
sparkContextStorage.rm(outputLocation);
}
// the Spark application name will always be set by SparkContextStorage, thus, INFO the name to make it easier to debug
logger.debug(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
// execute the vertex program and map reducers and if there is a failure, auto-close the spark context
try {
this.loadJars(hadoopConfiguration, sparkContext); // add the project jars to the cluster
updateLocalConfiguration(sparkContext, hadoopConfiguration);
// create a message-passing friendly rdd from the input rdd
boolean partitioned = false;
JavaPairRDD<Object, VertexWritable> loadedGraphRDD = SparkIOUtil.loadVertices(inputRDD, graphComputerConfiguration, sparkContext);
// if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
if (filtered) {
this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
loadedGraphRDD = SparkExecutor.applyGraphFilter(loadedGraphRDD, this.graphFilter);
}
// if the loaded graph RDD is already partitioned use that partitioner, else partition it with HashPartitioner
if (loadedGraphRDD.partitioner().isPresent())
this.logger.debug("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
else {
if (!skipPartitioner) {
final Partitioner partitioner = new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size());
this.logger.debug("Partitioning the loaded graphRDD: " + partitioner);
loadedGraphRDD = loadedGraphRDD.partitionBy(partitioner);
partitioned = true;
assert loadedGraphRDD.partitioner().isPresent();
} else {
assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent(); // no easy way to test this with a test case
this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + GREMLIN_SPARK_SKIP_PARTITIONER);
}
}
// if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
if (this.workersSet) {
if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the loaded graphRDD does not have more partitions than workers
loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the loaded graphRDD does not have less partitions than workers
loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
}
// persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
if (!skipPersist && (!inputFromSpark || partitioned || filtered))
loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
// final graph with view (for persisting and/or mapReducing -- may be null and thus, possible to save space/time)
JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
////////////////////////////////
// process the vertex program //
////////////////////////////////
if (null != this.vertexProgram) {
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
// build a shortcut (which reduces the total Spark stages from 3 to 2) for CloneVertexProgram since it does nothing
// and this improves the overall performance a lot
if (this.vertexProgram.getClass().equals(CloneVertexProgram.class) &&
!graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkCloneVertexProgramInterceptor.class.getName());
}
/////////////////
// if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
if (graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
try {
final SparkVertexProgramInterceptor<VertexProgram> interceptor =
(SparkVertexProgramInterceptor) Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
} catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new IllegalStateException(e.getMessage());
}
} else { // standard GraphComputer semantics
// get a configuration that will be propagated to all workers
final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
this.vertexProgram.storeState(vertexProgramConfiguration);
// set up the vertex program and wire up configurations
this.vertexProgram.setup(memory);
JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
memory.broadcastMemory(sparkContext);
// execute the vertex program
while (true) {
if (Thread.interrupted()) {
sparkContext.cancelAllJobs();
throw new TraversalInterruptedException();
}
memory.setInExecute(true);
viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, graphComputerConfiguration, vertexProgramConfiguration);
memory.setInExecute(false);
if (this.vertexProgram.terminate(memory))
break;
else {
memory.incrIteration();
memory.broadcastMemory(sparkContext);
}
}
// if the graph will be continued to be used (persisted or mapreduced), then generate a view+graph
if ((null != outputRDD && !this.persist.equals(Persist.NOTHING)) || !this.mapReducers.isEmpty()) {
computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
assert null != computedGraphRDD && computedGraphRDD != loadedGraphRDD;
} else {
// ensure that the computedGraphRDD was not created
assert null == computedGraphRDD;
}
}
/////////////////
memory.complete(); // drop all transient memory keys
// write the computed graph to the respective output (rdd or output format)
if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
assert null != computedGraphRDD; // the logic holds that a computeGraphRDD must be created at this point
outputRDD.writeGraphRDD(graphComputerConfiguration, computedGraphRDD);
}
}
final boolean computedGraphCreated = computedGraphRDD != null && computedGraphRDD != loadedGraphRDD;
if (!computedGraphCreated)
computedGraphRDD = loadedGraphRDD;
final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
//////////////////////////////
// process the map reducers //
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
// create a mapReduceRDD for executing the map reduce jobs on
JavaPairRDD<Object, VertexWritable> mapReduceRDD = computedGraphRDD;
if (computedGraphCreated && !outputToSpark) {
// drop all the edges of the graph as they are not used in mapReduce processing
mapReduceRDD = computedGraphRDD.mapValues(vertexWritable -> {
vertexWritable.get().dropEdges(Direction.BOTH);
return vertexWritable;
});
// if there is only one MapReduce to execute, don't bother wasting the clock cycles.
if (this.mapReducers.size() > 1)
mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
}
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(graphComputerConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, newApacheConfiguration);
// combine
final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
// reduce
final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
// write the map reduce output back to disk and computer result memory
if (null != outputRDD)
mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(graphComputerConfiguration, mapReduce.getMemoryKey(), reduceRDD));
}
// if the mapReduceRDD is not simply the computed graph, unpersist the mapReduceRDD
if (computedGraphCreated && !outputToSpark) {
assert loadedGraphRDD != computedGraphRDD;
assert mapReduceRDD != computedGraphRDD;
mapReduceRDD.unpersist();
} else {
assert mapReduceRDD == computedGraphRDD;
}
}
// unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
// if the graphRDD was loaded from Spark, but then partitioned or filtered, its a different RDD
if (!inputFromSpark || partitioned || filtered)
loadedGraphRDD.unpersist();
// unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
// if the computed graph is the loadedGraphRDD because it was not mutated and not-unpersisted, then don't unpersist the computedGraphRDD/loadedGraphRDD
if ((!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING)) && computedGraphCreated)
computedGraphRDD.unpersist();
// delete any file system or rdd data if persist nothing
if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
if (outputToHDFS)
fileSystemStorage.rm(outputLocation);
if (outputToSpark)
sparkContextStorage.rm(outputLocation);
}
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
// clear properties that should not be propagated in an OLAP chain
graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_GRAPH_CACHE);
graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_PARTITIONER);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
} finally {
if (!graphComputerConfiguration.getBoolean(GREMLIN_SPARK_PERSIST_CONTEXT, false))
Spark.close();
}
});
computerService.shutdown();
return result;
}