private static void drainStep()

in gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java [181:246]


    private static void drainStep(final Vertex vertex,
                                  final Step<Object, Object> step,
                                  final TraverserSet<Object> activeTraversers,
                                  final TraverserSet<Object> haltedTraversers,
                                  final Memory memory,
                                  final boolean returnHaltedTraversers,
                                  final HaltedTraverserStrategy haltedTraverserStrategy) {
        GraphComputing.atMaster(step, false);
        if (step instanceof Barrier) {
            if (step instanceof Bypassing)
                ((Bypassing) step).setBypass(true);

            if (step instanceof LocalBarrier) {
                // local barrier traversers are stored on the vertex until the master traversal synchronizes the system
                final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
                final TraverserSet<Object> localBarrierTraversers = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
                vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, localBarrierTraversers);
                while (barrier.hasNextBarrier()) {
                    final TraverserSet<Object> barrierSet = barrier.nextBarrier();
                    IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
                        traverser.addLabels(step.getLabels());  // this might need to be generalized for working with global barriers too
                        if (traverser.isHalted() &&
                                (returnHaltedTraversers ||
                                        (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                        Host.getHostingVertex(traverser.get()).equals(vertex))) {
                            if (returnHaltedTraversers)
                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                            else
                                haltedTraversers.add(traverser.detach());
                        } else
                            localBarrierTraversers.add(traverser.detach());
                    });
                }
                memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
            } else {
                final Barrier barrier = (Barrier) step;
                if (barrier.hasNextBarrier()) {
                    while (barrier.hasNextBarrier()) {
                        memory.add(step.getId(), barrier.nextBarrier());
                    }
                } else {
                    // ensure the step id gets added to memory or else barriers that filter like order().by('no-exist')
                    // will end in error when that memory key can't be found by MasterExecutor.processMemory()
                    memory.add(step.getId(), new TraverserSet<>());
                }


                memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
            }
        } else { // LOCAL PROCESSING
            step.forEachRemaining(traverser -> {
                if (traverser.isHalted() &&
                        // if its a ReferenceFactory (one less iteration required)
                        ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserStrategy.getHaltedTraverserFactory()) &&
                                (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                Host.getHostingVertex(traverser.get()).equals(vertex))) {
                    if (returnHaltedTraversers)
                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                    else
                        haltedTraversers.add(traverser.detach());
                } else {
                    activeTraversers.add(traverser);
                }
            });
        }
    }