static Reductions create()

in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java [58:145]


  static Reductions create(
      BackPressureValve valve,
      StatefulFunctionsUniverse statefulFunctionsUniverse,
      RuntimeContext context,
      KeyedStateBackend<Object> keyedStateBackend,
      TimerServiceFactory timerServiceFactory,
      InternalListState<String, Long, Message> delayedMessagesBufferState,
      MapState<String, Long> delayMessageIndex,
      Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
      Output<StreamRecord<Message>> output,
      MessageFactory messageFactory,
      Executor mailboxExecutor,
      MetricGroup metricGroup,
      MapState<Long, Message> asyncOperations) {

    ObjectContainer container = new ObjectContainer();

    container.add("function-providers", Map.class, statefulFunctionsUniverse.functions());
    container.add(
        "namespace-function-providers", Map.class, statefulFunctionsUniverse.namespaceFunctions());
    container.add(
        "function-repository", FunctionRepository.class, StatefulFunctionRepository.class);
    container.addAlias(
        "function-metrics-repository",
        FunctionTypeMetricsRepository.class,
        "function-repository",
        FunctionRepository.class);

    // for FlinkState
    container.add("runtime-context", RuntimeContext.class, context);
    container.add("keyed-state-backend", KeyedStateBackend.class, keyedStateBackend);
    container.add(new DynamicallyRegisteredTypes(statefulFunctionsUniverse.types()));
    container.add("state", State.class, FlinkState.class);

    // For reductions
    container.add(messageFactory);

    container.add(
        new Partition(
            context.getMaxNumberOfParallelSubtasks(),
            context.getNumberOfParallelSubtasks(),
            context.getIndexOfThisSubtask()));

    container.add(new RemoteSink(output));
    container.add(new SideOutputSink(sideOutputs, output));

    container.add("applying-context", ApplyingContext.class, ReusableContext.class);
    container.add(LocalSink.class);
    container.add("function-loader", FunctionLoader.class, PredefinedFunctionLoader.class);
    container.add(Reductions.class);
    container.add(LocalFunctionGroup.class);
    container.add(
        "function-metrics-factory",
        FuncionTypeMetricsFactory.class,
        new FlinkFuncionTypeMetricsFactory(metricGroup));
    container.add(
        "function-dispatcher-metrics",
        FunctionDispatcherMetrics.class,
        new FlinkFunctionDispatcherMetrics(metricGroup));

    // for delayed messages
    container.add(
        "delayed-messages-buffer-state", InternalListState.class, delayedMessagesBufferState);
    container.add("delayed-message-index", MapState.class, delayMessageIndex);
    container.add(
        "delayed-messages-buffer",
        DelayedMessagesBuffer.class,
        FlinkStateDelayedMessagesBuffer.class);
    container.add(
        "delayed-messages-timer-service-factory", TimerServiceFactory.class, timerServiceFactory);
    container.add(DelaySink.class);
    container.add(DelayMessageHandler.class);

    // lazy providers for the sinks
    container.add("function-group", new Lazy<>(LocalFunctionGroup.class));
    container.add("reductions", new Lazy<>(Reductions.class));

    container.add("mailbox-executor", Executor.class, mailboxExecutor);

    // for the async operations
    container.add("async-operations", MapState.class, asyncOperations);
    container.add(AsyncSink.class);
    container.add(PendingAsyncOperations.class);

    container.add("backpressure-valve", BackPressureValve.class, valve);

    return container.get(Reductions.class);
  }