in spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java [73:163]
public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
vertexProgram.setup(memory);
final Traversal.Admin<Vertex, Object> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
final GraphStep<Vertex, Vertex> graphStep = ((GraphStep) traversal.getStartStep());
final Object[] graphStepIds = graphStep.getIds(); // any V(1,2,3)-style ids to filter on
final ReducingBarrierStep endStep = (ReducingBarrierStep) traversal.getEndStep(); // needed for the final traverser generation
traversal.removeStep(0); // remove GraphStep
traversal.removeStep(traversal.getSteps().size() - 1); // remove ReducingBarrierStep
traversal.setStrategies(traversal.clone().getStrategies().removeStrategies(ComputerVerificationStrategy.class, ComputerFinalizationStrategy.class)); // no longer a computer job, but parallel standard jobs
traversal.applyStrategies(); // compile
boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
///////////////////////////////
MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, ProgramPhase.EXECUTE); // any intermediate sideEffect steps are backed by SparkMemory
memory.setInExecute(true);
final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
.filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
.flatMap(vertexWritable -> {
if (identityTraversal) // g.V.count()-style (identity)
return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
else { // add the vertex to head of the traversal
final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l));
return (Step) clone.getEndStep();
}
});
// USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL
final Object result;
if (endStep instanceof CountGlobalStep)
result = nextRDD.map(Traverser::bulk).fold(0L, (a, b) -> a + b);
else if (endStep instanceof SumGlobalStep) {
result = nextRDD.isEmpty() ? null : nextRDD
.map(traverser -> {
final Number n = (Number) traverser.get();
final Class<? extends Number> clazz = null == n ? Long.class : n.getClass();
return NumberHelper.mul(n, NumberHelper.coerceTo(traverser.bulk(), clazz));
}).fold(0, NumberHelper::add);
} else if (endStep instanceof MeanGlobalStep) {
result = nextRDD.isEmpty() ? null : nextRDD
.map(traverser -> new MeanGlobalStep.MeanNumber((Number) traverser.get(), traverser.bulk()))
.fold(MeanNumberSupplier.instance().get(), MeanGlobalStep.MeanNumber::add)
.getFinal();
} else if (endStep instanceof MinGlobalStep) {
result = nextRDD.isEmpty() ? null : nextRDD
.map(traverser -> (Comparable) traverser.get())
.fold(Double.NaN, NumberHelper::min);
} else if (endStep instanceof MaxGlobalStep) {
result = nextRDD.isEmpty() ? null : nextRDD
.map(traverser -> (Comparable) traverser.get())
.fold(Double.NaN, NumberHelper::max);
} else if (endStep instanceof FoldStep) {
final BinaryOperator biOperator = endStep.getBiOperator();
result = nextRDD.map(traverser -> {
if (endStep.getSeedSupplier() instanceof ArrayListSupplier) {
final List list = new ArrayList<>();
for (long i = 0; i < traverser.bulk(); i++) {
list.add(traverser.get());
}
return list;
} else {
return traverser.get();
}
}).fold(endStep.getSeedSupplier().get(), biOperator::apply);
} else if (endStep instanceof GroupStep) {
final GroupStep.GroupBiOperator<Object, Object> biOperator = (GroupStep.GroupBiOperator) endStep.getBiOperator();
result = ((GroupStep) endStep).generateFinalResult(nextRDD.
mapPartitions(partitions -> {
final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone();
return IteratorUtils.map(partitions, clone::projectTraverser);
}).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply));
} else if (endStep instanceof GroupCountStep) {
final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
result = nextRDD
.mapPartitions(partitions -> {
final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone();
return IteratorUtils.map(partitions, clone::projectTraverser);
})
.fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply);
} else
throw new IllegalArgumentException("The end step is an unsupported barrier: " + endStep);
memory.setInExecute(false);
///////////////////////////////
// generate the HALTED_TRAVERSERS for the memory
if (result != null) {
final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
haltedTraversers.add(traversal.getTraverserGenerator().generate(result, endStep, 1l)); // all reducing barrier steps produce a result of bulk 1
memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
}
memory.incrIteration(); // any local star graph reduction takes a single iteration
return inputRDD;
}