public JavaPairRDD apply()

in spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java [73:163]


    public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
        vertexProgram.setup(memory);
        final Traversal.Admin<Vertex, Object> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
        final GraphStep<Vertex, Vertex> graphStep = ((GraphStep) traversal.getStartStep());
        final Object[] graphStepIds = graphStep.getIds();    // any V(1,2,3)-style ids to filter on
        final ReducingBarrierStep endStep = (ReducingBarrierStep) traversal.getEndStep(); // needed for the final traverser generation
        traversal.removeStep(0);                                    // remove GraphStep
        traversal.removeStep(traversal.getSteps().size() - 1);      // remove ReducingBarrierStep
        traversal.setStrategies(traversal.clone().getStrategies().removeStrategies(ComputerVerificationStrategy.class, ComputerFinalizationStrategy.class)); // no longer a computer job, but parallel standard jobs
        traversal.applyStrategies();                                // compile
        boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
        ///////////////////////////////
        MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, ProgramPhase.EXECUTE); // any intermediate sideEffect steps are backed by SparkMemory
        memory.setInExecute(true);
        final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
                .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
                .flatMap(vertexWritable -> {
                    if (identityTraversal)                          // g.V.count()-style (identity)
                        return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
                    else {                                          // add the vertex to head of the traversal
                        final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
                        clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l));
                        return (Step) clone.getEndStep();
                    }
                });
        // USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL
        final Object result;
        if (endStep instanceof CountGlobalStep)
            result = nextRDD.map(Traverser::bulk).fold(0L, (a, b) -> a + b);
        else if (endStep instanceof SumGlobalStep) {
            result = nextRDD.isEmpty() ? null : nextRDD
                    .map(traverser -> {
                        final Number n = (Number) traverser.get();
                        final Class<? extends Number> clazz = null == n ? Long.class : n.getClass();
                        return NumberHelper.mul(n, NumberHelper.coerceTo(traverser.bulk(), clazz));
                    }).fold(0, NumberHelper::add);
        } else if (endStep instanceof MeanGlobalStep) {
            result = nextRDD.isEmpty() ? null : nextRDD
                    .map(traverser -> new MeanGlobalStep.MeanNumber((Number) traverser.get(), traverser.bulk()))
                    .fold(MeanNumberSupplier.instance().get(), MeanGlobalStep.MeanNumber::add)
                    .getFinal();
        } else if (endStep instanceof MinGlobalStep) {
            result = nextRDD.isEmpty() ? null : nextRDD
                    .map(traverser -> (Comparable) traverser.get())
                    .fold(Double.NaN, NumberHelper::min);
        } else if (endStep instanceof MaxGlobalStep) {
            result = nextRDD.isEmpty() ? null : nextRDD
                    .map(traverser -> (Comparable) traverser.get())
                    .fold(Double.NaN, NumberHelper::max);
        } else if (endStep instanceof FoldStep) {
            final BinaryOperator biOperator = endStep.getBiOperator();
            result = nextRDD.map(traverser -> {
                if (endStep.getSeedSupplier() instanceof ArrayListSupplier) {
                    final List list = new ArrayList<>();
                    for (long i = 0; i < traverser.bulk(); i++) {
                        list.add(traverser.get());
                    }
                    return list;
                } else {
                    return traverser.get();
                }
            }).fold(endStep.getSeedSupplier().get(), biOperator::apply);
        } else if (endStep instanceof GroupStep) {
            final GroupStep.GroupBiOperator<Object, Object> biOperator = (GroupStep.GroupBiOperator) endStep.getBiOperator();
            result = ((GroupStep) endStep).generateFinalResult(nextRDD.
                    mapPartitions(partitions -> {
                        final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone();
                        return IteratorUtils.map(partitions, clone::projectTraverser);
                    }).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply));
        } else if (endStep instanceof GroupCountStep) {
            final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
            result = nextRDD
                    .mapPartitions(partitions -> {
                        final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone();
                        return IteratorUtils.map(partitions, clone::projectTraverser);
                    })
                    .fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply);
        } else
            throw new IllegalArgumentException("The end step is an unsupported barrier: " + endStep);
        memory.setInExecute(false);
        ///////////////////////////////

        // generate the HALTED_TRAVERSERS for the memory
        if (result != null) {
            final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
            haltedTraversers.add(traversal.getTraverserGenerator().generate(result, endStep, 1l)); // all reducing barrier steps produce a result of bulk 1
            memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
        }
        memory.incrIteration(); // any local star graph reduction takes a single iteration
        return inputRDD;
    }