in tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java [140:259]
public Future<ComputerResult> submit() {
// a graph computer can only be executed once
if (this.executed)
throw Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
else
this.executed = true;
// it is not possible execute a computer if it has no vertex program nor mapreducers
if (null == this.vertexProgram && this.mapReducers.isEmpty())
throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
// it is possible to run mapreducers without a vertex program
if (null != this.vertexProgram) {
GraphComputerHelper.validateProgramOnComputer(this, this.vertexProgram);
this.mapReducers.addAll(this.vertexProgram.getMapReducers());
}
// get the result graph and persist state to use for the computation
this.resultGraph = GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.resultGraph));
this.persist = GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.persist));
if (!this.features().supportsResultGraphPersistCombination(this.resultGraph, this.persist))
throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraph, this.persist);
// ensure requested workers are not larger than supported workers
if (this.workers > this.features().getMaxWorkers())
throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());
// initialize the memory
this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
final Future<ComputerResult> result = computerService.submit(() -> {
final long time = System.currentTimeMillis();
final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() : Collections.emptySet());
final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory, this.workers);
try {
if (null != this.vertexProgram) {
// execute the vertex program
this.vertexProgram.setup(this.memory);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
workers.executeVertexProgram((vertices, vertexProgram, workerMemory) -> {
vertexProgram.workerIterationStart(workerMemory.asImmutable());
while (vertices.hasNext()) {
final Vertex vertex = vertices.next();
if (Thread.interrupted()) throw new TraversalInterruptedException();
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, vertexProgram),
new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
workerMemory);
}
vertexProgram.workerIterationEnd(workerMemory.asImmutable());
workerMemory.complete();
});
this.messageBoard.completeIteration();
this.memory.completeSubRound();
if (this.vertexProgram.terminate(this.memory)) {
this.memory.incrIteration();
break;
} else {
this.memory.incrIteration();
}
}
view.complete(); // drop all transient vertex compute keys
}
// execute mapreduce jobs
for (final MapReduce mapReduce : mapReducers) {
final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
workers.setMapReduce(mapReduce);
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.MAP);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
final Vertex vertex = vertices.next();
if (null == vertex) break;
workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
}
workerMapReduce.workerEnd(MapReduce.Stage.MAP);
});
// sort results if a map output sort is defined
mapEmitter.complete(mapReduce);
// no need to run combiners as this is single machine
if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
final Map.Entry<?, Queue<?>> entry = keyValues.next();
if (null == entry) break;
workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
}
workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
});
reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
} else {
mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
}
}
// update runtime and return the newly computed graph
this.memory.setRuntime(System.currentTimeMillis() - time);
this.memory.complete(); // drop all transient properties and set iteration
// determine the resultant graph based on the result graph/persist state
final Graph resultGraph = view.processResultGraphPersist(this.resultGraph, this.persist);
TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
} catch (InterruptedException ie) {
workers.closeNow();
throw new TraversalInterruptedException();
} catch (Exception ex) {
workers.closeNow();
throw new RuntimeException(ex);
} finally {
workers.close();
}
});
this.computerService.shutdown();
return result;
}