in gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java [181:246]
private static void drainStep(final Vertex vertex,
final Step<Object, Object> step,
final TraverserSet<Object> activeTraversers,
final TraverserSet<Object> haltedTraversers,
final Memory memory,
final boolean returnHaltedTraversers,
final HaltedTraverserStrategy haltedTraverserStrategy) {
GraphComputing.atMaster(step, false);
if (step instanceof Barrier) {
if (step instanceof Bypassing)
((Bypassing) step).setBypass(true);
if (step instanceof LocalBarrier) {
// local barrier traversers are stored on the vertex until the master traversal synchronizes the system
final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
final TraverserSet<Object> localBarrierTraversers = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, localBarrierTraversers);
while (barrier.hasNextBarrier()) {
final TraverserSet<Object> barrierSet = barrier.nextBarrier();
IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
traverser.addLabels(step.getLabels()); // this might need to be generalized for working with global barriers too
if (traverser.isHalted() &&
(returnHaltedTraversers ||
(!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
Host.getHostingVertex(traverser.get()).equals(vertex))) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
else
haltedTraversers.add(traverser.detach());
} else
localBarrierTraversers.add(traverser.detach());
});
}
memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
} else {
final Barrier barrier = (Barrier) step;
if (barrier.hasNextBarrier()) {
while (barrier.hasNextBarrier()) {
memory.add(step.getId(), barrier.nextBarrier());
}
} else {
// ensure the step id gets added to memory or else barriers that filter like order().by('no-exist')
// will end in error when that memory key can't be found by MasterExecutor.processMemory()
memory.add(step.getId(), new TraverserSet<>());
}
memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
}
} else { // LOCAL PROCESSING
step.forEachRemaining(traverser -> {
if (traverser.isHalted() &&
// if its a ReferenceFactory (one less iteration required)
((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserStrategy.getHaltedTraverserFactory()) &&
(!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
Host.getHostingVertex(traverser.get()).equals(vertex))) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
else
haltedTraversers.add(traverser.detach());
} else {
activeTraversers.add(traverser);
}
});
}
}