protected static boolean execute()

in gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java [60:179]


    protected static boolean execute(final Vertex vertex,
                                     final Messenger<TraverserSet<Object>> messenger,
                                     final TraversalMatrix<?, ?> traversalMatrix,
                                     final Memory memory,
                                     final boolean returnHaltedTraversers,
                                     final TraverserSet<Object> haltedTraversers,
                                     final HaltedTraverserStrategy haltedTraverserStrategy) {
        final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
        final AtomicBoolean voteToHalt = new AtomicBoolean(true);
        final TraverserSet<Object> activeTraversers = new TraverserSet<>();
        final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();

        ////////////////////////////////
        // GENERATE LOCAL TRAVERSERS //
        ///////////////////////////////

        // MASTER ACTIVE
        // these are traversers that are going from OLTP (master) to OLAP (workers)
        // these traversers were broadcasted from the master traversal to the workers for attachment
        final IndexedTraverserSet<Object,Vertex> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
        // some memory systems are interacted with by multiple threads and thus, concurrent modification can happen at iterator.remove().
        // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
        // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing.
        synchronized (maybeActiveTraversers) {
            if (!maybeActiveTraversers.isEmpty()) {
                final Collection<Traverser.Admin<Object>> traversers = maybeActiveTraversers.get(vertex);
                if (traversers != null) {
                    final Iterator<Traverser.Admin<Object>> iterator = traversers.iterator();
                    while (iterator.hasNext()) {
                        final Traverser.Admin<Object> traverser = iterator.next();
                        iterator.remove();
                        maybeActiveTraversers.remove(traverser);
                        traverser.attach(Attachable.Method.get(vertex));
                        traverser.setSideEffects(traversalSideEffects);
                        toProcessTraversers.add(traverser);
                    }
                }
            }
        }

        // WORKER ACTIVE
        // these are traversers that exist from from a local barrier
        // these traversers will simply saved at the local vertex while the master traversal synchronized the barrier
        vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
            IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
                traverser.attach(Attachable.Method.get(vertex));
                traverser.setSideEffects(traversalSideEffects);
                toProcessTraversers.add(traverser);
            });
            assert previousActiveTraversers.isEmpty();
            // remove the property to save space
            vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
        });

        // TRAVERSER MESSAGES (WORKER -> WORKER)
        // these are traversers that have been messaged to the vertex from another vertex
        final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
        while (messages.hasNext()) {
            IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
                if (traverser.isHalted()) {
                    if (returnHaltedTraversers)
                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                    else
                        haltedTraversers.add(traverser); // the traverser has already been detached so no need to detach it again
                } else {
                    // traverser is not halted and thus, should be processed locally
                    // attach it and process
                    traverser.attach(Attachable.Method.get(vertex));
                    traverser.setSideEffects(traversalSideEffects);
                    toProcessTraversers.add(traverser);
                }
            });
        }

        ///////////////////////////////
        // PROCESS LOCAL TRAVERSERS //
        //////////////////////////////

        // while there are still local traversers, process them until they leave the vertex (message pass) or halt (store).
        while (!toProcessTraversers.isEmpty()) {
            Step<Object, Object> previousStep = EmptyStep.instance();
            Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
            while (traversers.hasNext()) {
                final Traverser.Admin<Object> traverser = traversers.next();
                traversers.remove();
                final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
                // try and fill up the current step as much as possible with traversers to get a bulking optimization
                if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
                    WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
                currentStep.addStart(traverser);
                previousStep = currentStep;
            }
            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
            // all processed traversers should be either halted or active
            assert toProcessTraversers.isEmpty();
            // process all the local objects and send messages or store locally again
            if (!activeTraversers.isEmpty()) {
                traversers = activeTraversers.iterator();
                while (traversers.hasNext()) {
                    final Traverser.Admin<Object> traverser = traversers.next();
                    traversers.remove();
                    // decide whether to message the traverser or to process it locally
                    if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
                        // if the element is remote, then message, else store it locally for re-processing
                        final Vertex hostingVertex = Host.getHostingVertex(traverser.get());
                        if (!vertex.equals(hostingVertex)) { // if its host is not the current vertex, then send the traverser to the hosting vertex
                            voteToHalt.set(false); // if message is passed, then don't vote to halt
                            messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach()));
                        } else {
                            traverser.attach(Attachable.Method.get(vertex)); // necessary for select() steps that reference the current object
                            toProcessTraversers.add(traverser);
                        }
                    } else                                                                              // STANDARD OBJECT
                        toProcessTraversers.add(traverser);
                }
                assert activeTraversers.isEmpty();
            }
        }
        return voteToHalt.get();
    }