in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java [58:145]
static Reductions create(
BackPressureValve valve,
StatefulFunctionsUniverse statefulFunctionsUniverse,
RuntimeContext context,
KeyedStateBackend<Object> keyedStateBackend,
TimerServiceFactory timerServiceFactory,
InternalListState<String, Long, Message> delayedMessagesBufferState,
MapState<String, Long> delayMessageIndex,
Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
Output<StreamRecord<Message>> output,
MessageFactory messageFactory,
Executor mailboxExecutor,
MetricGroup metricGroup,
MapState<Long, Message> asyncOperations) {
ObjectContainer container = new ObjectContainer();
container.add("function-providers", Map.class, statefulFunctionsUniverse.functions());
container.add(
"namespace-function-providers", Map.class, statefulFunctionsUniverse.namespaceFunctions());
container.add(
"function-repository", FunctionRepository.class, StatefulFunctionRepository.class);
container.addAlias(
"function-metrics-repository",
FunctionTypeMetricsRepository.class,
"function-repository",
FunctionRepository.class);
// for FlinkState
container.add("runtime-context", RuntimeContext.class, context);
container.add("keyed-state-backend", KeyedStateBackend.class, keyedStateBackend);
container.add(new DynamicallyRegisteredTypes(statefulFunctionsUniverse.types()));
container.add("state", State.class, FlinkState.class);
// For reductions
container.add(messageFactory);
container.add(
new Partition(
context.getMaxNumberOfParallelSubtasks(),
context.getNumberOfParallelSubtasks(),
context.getIndexOfThisSubtask()));
container.add(new RemoteSink(output));
container.add(new SideOutputSink(sideOutputs, output));
container.add("applying-context", ApplyingContext.class, ReusableContext.class);
container.add(LocalSink.class);
container.add("function-loader", FunctionLoader.class, PredefinedFunctionLoader.class);
container.add(Reductions.class);
container.add(LocalFunctionGroup.class);
container.add(
"function-metrics-factory",
FuncionTypeMetricsFactory.class,
new FlinkFuncionTypeMetricsFactory(metricGroup));
container.add(
"function-dispatcher-metrics",
FunctionDispatcherMetrics.class,
new FlinkFunctionDispatcherMetrics(metricGroup));
// for delayed messages
container.add(
"delayed-messages-buffer-state", InternalListState.class, delayedMessagesBufferState);
container.add("delayed-message-index", MapState.class, delayMessageIndex);
container.add(
"delayed-messages-buffer",
DelayedMessagesBuffer.class,
FlinkStateDelayedMessagesBuffer.class);
container.add(
"delayed-messages-timer-service-factory", TimerServiceFactory.class, timerServiceFactory);
container.add(DelaySink.class);
container.add(DelayMessageHandler.class);
// lazy providers for the sinks
container.add("function-group", new Lazy<>(LocalFunctionGroup.class));
container.add("reductions", new Lazy<>(Reductions.class));
container.add("mailbox-executor", Executor.class, mailboxExecutor);
// for the async operations
container.add("async-operations", MapState.class, asyncOperations);
container.add(AsyncSink.class);
container.add(PendingAsyncOperations.class);
container.add("backpressure-valve", BackPressureValve.class, valve);
return container.get(Reductions.class);
}