private MantisJobBuilder makeMantisJob()

in mantis-runtime/src/main/java/io/mantisrx/runtime/core/MantisStreamImpl.java [128:181]


    private MantisJobBuilder makeMantisJob(ImmutableValueGraph<OperandNode<?>, MantisFunction> graphDag, Iterable<OperandNode<?>> operandNodes) {
        MantisJobBuilder jobBuilder = new MantisJobBuilder();
        final AtomicReference<FunctionCombinator<?, ?>> composite = new AtomicReference<>(new FunctionCombinator<>(false));
        for (OperandNode<?> n : operandNodes) {
            Set<OperandNode<?>> successorsNodes = graphDag.successors(n);
            if (successorsNodes.size() == 0) {
                continue;
            }
            // remove self-loop
            Optional<MantisFunction> selfEdge = graphDag.edgeValue(n, n);
            Integer numSelfEdges = selfEdge.map(x -> 1).orElse(0);
            selfEdge.ifPresent(mantisFn -> {
                if (MantisFunction.empty().equals(mantisFn)) {
                    jobBuilder.addStage(composite.get().makeStage(), n.getCodec(), n.getKeyCodec());
                    composite.set(new FunctionCombinator<>(false));
                } else if (mantisFn instanceof WindowFunction) {
                    composite.set(composite.get().add(mantisFn));
                }
                // No other types of self-edges are possible
            });
            if (successorsNodes.size() - numSelfEdges > 1) {
                log.warn("Found multi-output node {} with nbrs {}. Not supported yet!", n, successorsNodes);
            }
            for (OperandNode<?> successorsNode : successorsNodes) {
                if (successorsNode == n) {
                    continue;
                }
                graphDag.edgeValue(n, successorsNode).ifPresent(mantisFn -> {
                    if (mantisFn instanceof SourceFunction) {
                        if (mantisFn instanceof ObservableSourceImpl) {
                            jobBuilder.addStage(((ObservableSourceImpl) mantisFn).getSource());
                        }
                    } else if (mantisFn instanceof KeyByFunction) {
                        // ensure that the existing composite is empty here
                        if (composite.get().size() > 0) {
                            log.warn("Unempty composite found for KeyByFunction {}", composite.get());
                        }
                        jobBuilder.addStage(makeGroupComputation((KeyByFunction) mantisFn), n.getCodec(), n.getKeyCodec());
                        composite.set(new FunctionCombinator<>(true));
                    } else if (mantisFn instanceof SinkFunction) {
                        // materialize all composite functions into a scalar stage
                        // it can't be a keyed stage because we didn't encounter a reduce function!
                        jobBuilder.addStage(composite.get().makeStage(), n.getCodec());
                        if (mantisFn instanceof ObservableSinkImpl) {
                            jobBuilder.addStage(((ObservableSinkImpl) mantisFn).getSink());
                        }
                    } else {
                        composite.set(composite.get().add(mantisFn));
                    }
                });
            }
        }
        return jobBuilder;
    }