in mantis-runtime/src/main/java/io/mantisrx/runtime/core/MantisStreamImpl.java [128:181]
private MantisJobBuilder makeMantisJob(ImmutableValueGraph<OperandNode<?>, MantisFunction> graphDag, Iterable<OperandNode<?>> operandNodes) {
MantisJobBuilder jobBuilder = new MantisJobBuilder();
final AtomicReference<FunctionCombinator<?, ?>> composite = new AtomicReference<>(new FunctionCombinator<>(false));
for (OperandNode<?> n : operandNodes) {
Set<OperandNode<?>> successorsNodes = graphDag.successors(n);
if (successorsNodes.size() == 0) {
continue;
}
// remove self-loop
Optional<MantisFunction> selfEdge = graphDag.edgeValue(n, n);
Integer numSelfEdges = selfEdge.map(x -> 1).orElse(0);
selfEdge.ifPresent(mantisFn -> {
if (MantisFunction.empty().equals(mantisFn)) {
jobBuilder.addStage(composite.get().makeStage(), n.getCodec(), n.getKeyCodec());
composite.set(new FunctionCombinator<>(false));
} else if (mantisFn instanceof WindowFunction) {
composite.set(composite.get().add(mantisFn));
}
// No other types of self-edges are possible
});
if (successorsNodes.size() - numSelfEdges > 1) {
log.warn("Found multi-output node {} with nbrs {}. Not supported yet!", n, successorsNodes);
}
for (OperandNode<?> successorsNode : successorsNodes) {
if (successorsNode == n) {
continue;
}
graphDag.edgeValue(n, successorsNode).ifPresent(mantisFn -> {
if (mantisFn instanceof SourceFunction) {
if (mantisFn instanceof ObservableSourceImpl) {
jobBuilder.addStage(((ObservableSourceImpl) mantisFn).getSource());
}
} else if (mantisFn instanceof KeyByFunction) {
// ensure that the existing composite is empty here
if (composite.get().size() > 0) {
log.warn("Unempty composite found for KeyByFunction {}", composite.get());
}
jobBuilder.addStage(makeGroupComputation((KeyByFunction) mantisFn), n.getCodec(), n.getKeyCodec());
composite.set(new FunctionCombinator<>(true));
} else if (mantisFn instanceof SinkFunction) {
// materialize all composite functions into a scalar stage
// it can't be a keyed stage because we didn't encounter a reduce function!
jobBuilder.addStage(composite.get().makeStage(), n.getCodec());
if (mantisFn instanceof ObservableSinkImpl) {
jobBuilder.addStage(((ObservableSinkImpl) mantisFn).getSink());
}
} else {
composite.set(composite.get().add(mantisFn));
}
});
}
}
return jobBuilder;
}