public Future submit()

in tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java [140:259]


    public Future<ComputerResult> submit() {
        // a graph computer can only be executed once
        if (this.executed)
            throw Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        else
            this.executed = true;
        // it is not possible execute a computer if it has no vertex program nor mapreducers
        if (null == this.vertexProgram && this.mapReducers.isEmpty())
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        // it is possible to run mapreducers without a vertex program
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer(this, this.vertexProgram);
            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
        }
        // get the result graph and persist state to use for the computation
        this.resultGraph = GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.resultGraph));
        this.persist = GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.persist));
        if (!this.features().supportsResultGraphPersistCombination(this.resultGraph, this.persist))
            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraph, this.persist);
        // ensure requested workers are not larger than supported workers
        if (this.workers > this.features().getMaxWorkers())
            throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());

        // initialize the memory
        this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
        final Future<ComputerResult> result = computerService.submit(() -> {
            final long time = System.currentTimeMillis();
            final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() : Collections.emptySet());
            final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory, this.workers);
            try {
                if (null != this.vertexProgram) {
                    // execute the vertex program
                    this.vertexProgram.setup(this.memory);
                    while (true) {
                        if (Thread.interrupted()) throw new TraversalInterruptedException();
                        this.memory.completeSubRound();
                        workers.setVertexProgram(this.vertexProgram);
                        workers.executeVertexProgram((vertices, vertexProgram, workerMemory) -> {
                            vertexProgram.workerIterationStart(workerMemory.asImmutable());
                            while (vertices.hasNext()) {
                                final Vertex vertex = vertices.next();
                                if (Thread.interrupted()) throw new TraversalInterruptedException();
                                vertexProgram.execute(
                                        ComputerGraph.vertexProgram(vertex, vertexProgram),
                                        new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
                                        workerMemory);
                            }
                            vertexProgram.workerIterationEnd(workerMemory.asImmutable());
                            workerMemory.complete();
                        });
                        this.messageBoard.completeIteration();
                        this.memory.completeSubRound();
                        if (this.vertexProgram.terminate(this.memory)) {
                            this.memory.incrIteration();
                            break;
                        } else {
                            this.memory.incrIteration();
                        }
                    }
                    view.complete(); // drop all transient vertex compute keys
                }

                // execute mapreduce jobs
                for (final MapReduce mapReduce : mapReducers) {
                    final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
                    final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
                    workers.setMapReduce(mapReduce);
                    workers.executeMapReduce(workerMapReduce -> {
                        workerMapReduce.workerStart(MapReduce.Stage.MAP);
                        while (true) {
                            if (Thread.interrupted()) throw new TraversalInterruptedException();
                            final Vertex vertex = vertices.next();
                            if (null == vertex) break;
                            workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
                        }
                        workerMapReduce.workerEnd(MapReduce.Stage.MAP);
                    });
                    // sort results if a map output sort is defined
                    mapEmitter.complete(mapReduce);

                    // no need to run combiners as this is single machine
                    if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
                        final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
                        final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
                        workers.executeMapReduce(workerMapReduce -> {
                            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
                            while (true) {
                                if (Thread.interrupted()) throw new TraversalInterruptedException();
                                final Map.Entry<?, Queue<?>> entry = keyValues.next();
                                if (null == entry) break;
                                workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
                            }
                            workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
                        });
                        reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
                        mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
                    } else {
                        mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
                    }
                }
                // update runtime and return the newly computed graph
                this.memory.setRuntime(System.currentTimeMillis() - time);
                this.memory.complete(); // drop all transient properties and set iteration
                // determine the resultant graph based on the result graph/persist state
                final Graph resultGraph = view.processResultGraphPersist(this.resultGraph, this.persist);
                TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
                return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
            } catch (InterruptedException ie) {
                workers.closeNow();
                throw new TraversalInterruptedException();
            } catch (Exception ex) {
                workers.closeNow();
                throw new RuntimeException(ex);
            } finally {
                workers.close();
            }
        });
        this.computerService.shutdown();
        return result;
    }