in gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java [60:179]
protected static boolean execute(final Vertex vertex,
final Messenger<TraverserSet<Object>> messenger,
final TraversalMatrix<?, ?> traversalMatrix,
final Memory memory,
final boolean returnHaltedTraversers,
final TraverserSet<Object> haltedTraversers,
final HaltedTraverserStrategy haltedTraverserStrategy) {
final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
final AtomicBoolean voteToHalt = new AtomicBoolean(true);
final TraverserSet<Object> activeTraversers = new TraverserSet<>();
final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
////////////////////////////////
// GENERATE LOCAL TRAVERSERS //
///////////////////////////////
// MASTER ACTIVE
// these are traversers that are going from OLTP (master) to OLAP (workers)
// these traversers were broadcasted from the master traversal to the workers for attachment
final IndexedTraverserSet<Object,Vertex> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
// some memory systems are interacted with by multiple threads and thus, concurrent modification can happen at iterator.remove().
// its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
// most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing.
synchronized (maybeActiveTraversers) {
if (!maybeActiveTraversers.isEmpty()) {
final Collection<Traverser.Admin<Object>> traversers = maybeActiveTraversers.get(vertex);
if (traversers != null) {
final Iterator<Traverser.Admin<Object>> iterator = traversers.iterator();
while (iterator.hasNext()) {
final Traverser.Admin<Object> traverser = iterator.next();
iterator.remove();
maybeActiveTraversers.remove(traverser);
traverser.attach(Attachable.Method.get(vertex));
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
}
}
}
}
// WORKER ACTIVE
// these are traversers that exist from from a local barrier
// these traversers will simply saved at the local vertex while the master traversal synchronized the barrier
vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
traverser.attach(Attachable.Method.get(vertex));
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
});
assert previousActiveTraversers.isEmpty();
// remove the property to save space
vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
});
// TRAVERSER MESSAGES (WORKER -> WORKER)
// these are traversers that have been messaged to the vertex from another vertex
final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
while (messages.hasNext()) {
IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
if (traverser.isHalted()) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
else
haltedTraversers.add(traverser); // the traverser has already been detached so no need to detach it again
} else {
// traverser is not halted and thus, should be processed locally
// attach it and process
traverser.attach(Attachable.Method.get(vertex));
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
}
});
}
///////////////////////////////
// PROCESS LOCAL TRAVERSERS //
//////////////////////////////
// while there are still local traversers, process them until they leave the vertex (message pass) or halt (store).
while (!toProcessTraversers.isEmpty()) {
Step<Object, Object> previousStep = EmptyStep.instance();
Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
while (traversers.hasNext()) {
final Traverser.Admin<Object> traverser = traversers.next();
traversers.remove();
final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
// try and fill up the current step as much as possible with traversers to get a bulking optimization
if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
currentStep.addStart(traverser);
previousStep = currentStep;
}
WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
// all processed traversers should be either halted or active
assert toProcessTraversers.isEmpty();
// process all the local objects and send messages or store locally again
if (!activeTraversers.isEmpty()) {
traversers = activeTraversers.iterator();
while (traversers.hasNext()) {
final Traverser.Admin<Object> traverser = traversers.next();
traversers.remove();
// decide whether to message the traverser or to process it locally
if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT
// if the element is remote, then message, else store it locally for re-processing
final Vertex hostingVertex = Host.getHostingVertex(traverser.get());
if (!vertex.equals(hostingVertex)) { // if its host is not the current vertex, then send the traverser to the hosting vertex
voteToHalt.set(false); // if message is passed, then don't vote to halt
messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach()));
} else {
traverser.attach(Attachable.Method.get(vertex)); // necessary for select() steps that reference the current object
toProcessTraversers.add(traverser);
}
} else // STANDARD OBJECT
toProcessTraversers.add(traverser);
}
assert activeTraversers.isEmpty();
}
}
return voteToHalt.get();
}