public void open()

in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java [94:151]


  public void open() throws Exception {
    super.open();
    final StatefulFunctionsUniverse statefulFunctionsUniverse =
        statefulFunctionsUniverse(configuration);

    final TypeSerializer<Message> envelopeSerializer =
        getOperatorConfig().getTypeSerializerIn(0, getContainingTask().getUserCodeClassLoader());
    final MapStateDescriptor<Long, Message> asyncOperationStateDescriptor =
        new MapStateDescriptor<>(
            "asyncOperations", LongSerializer.INSTANCE, envelopeSerializer.duplicate());
    final ListStateDescriptor<Message> delayedMessageStateDescriptor =
        new ListStateDescriptor<>(
            FlinkStateDelayedMessagesBuffer.BUFFER_STATE_NAME, envelopeSerializer.duplicate());
    final MapStateDescriptor<String, Long> delayedMessageIndexDescriptor =
        new MapStateDescriptor<>(
            FlinkStateDelayedMessagesBuffer.INDEX_STATE_NAME, String.class, Long.class);
    final MapState<String, Long> delayedMessageIndex =
        getRuntimeContext().getMapState(delayedMessageIndexDescriptor);
    final MapState<Long, Message> asyncOperationState =
        getRuntimeContext().getMapState(asyncOperationStateDescriptor);

    Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");

    this.backPressureValve =
        new ThresholdBackPressureValve(configuration.getMaxAsyncOperationsPerTask());

    //
    // Remember what function providers are managing resources, so that we can close them when
    // this task closes.
    this.managingResources =
        resourceManagingFunctionProviders(statefulFunctionsUniverse.functions());

    //
    // the core logic of applying messages to functions.
    //
    this.reductions =
        Reductions.create(
            backPressureValve,
            statefulFunctionsUniverse,
            getRuntimeContext(),
            getKeyedStateBackend(),
            new FlinkTimerServiceFactory(
                super.getTimeServiceManager().orElseThrow(IllegalStateException::new)),
            delayedMessagesBufferState(delayedMessageStateDescriptor),
            delayedMessageIndex,
            sideOutputs,
            output,
            MessageFactory.forKey(statefulFunctionsUniverse.messageFactoryKey()),
            new MailboxExecutorFacade(mailboxExecutor, "Stateful Functions Mailbox"),
            getRuntimeContext().getMetricGroup().addGroup("functions"),
            asyncOperationState);

    //
    // expire all the pending async operations.
    //
    AsyncOperationFailureNotifier.fireExpiredAsyncOperations(
        asyncOperationStateDescriptor, reductions, getKeyedStateBackend());
  }