in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java [94:151]
public void open() throws Exception {
super.open();
final StatefulFunctionsUniverse statefulFunctionsUniverse =
statefulFunctionsUniverse(configuration);
final TypeSerializer<Message> envelopeSerializer =
getOperatorConfig().getTypeSerializerIn(0, getContainingTask().getUserCodeClassLoader());
final MapStateDescriptor<Long, Message> asyncOperationStateDescriptor =
new MapStateDescriptor<>(
"asyncOperations", LongSerializer.INSTANCE, envelopeSerializer.duplicate());
final ListStateDescriptor<Message> delayedMessageStateDescriptor =
new ListStateDescriptor<>(
FlinkStateDelayedMessagesBuffer.BUFFER_STATE_NAME, envelopeSerializer.duplicate());
final MapStateDescriptor<String, Long> delayedMessageIndexDescriptor =
new MapStateDescriptor<>(
FlinkStateDelayedMessagesBuffer.INDEX_STATE_NAME, String.class, Long.class);
final MapState<String, Long> delayedMessageIndex =
getRuntimeContext().getMapState(delayedMessageIndexDescriptor);
final MapState<Long, Message> asyncOperationState =
getRuntimeContext().getMapState(asyncOperationStateDescriptor);
Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
this.backPressureValve =
new ThresholdBackPressureValve(configuration.getMaxAsyncOperationsPerTask());
//
// Remember what function providers are managing resources, so that we can close them when
// this task closes.
this.managingResources =
resourceManagingFunctionProviders(statefulFunctionsUniverse.functions());
//
// the core logic of applying messages to functions.
//
this.reductions =
Reductions.create(
backPressureValve,
statefulFunctionsUniverse,
getRuntimeContext(),
getKeyedStateBackend(),
new FlinkTimerServiceFactory(
super.getTimeServiceManager().orElseThrow(IllegalStateException::new)),
delayedMessagesBufferState(delayedMessageStateDescriptor),
delayedMessageIndex,
sideOutputs,
output,
MessageFactory.forKey(statefulFunctionsUniverse.messageFactoryKey()),
new MailboxExecutorFacade(mailboxExecutor, "Stateful Functions Mailbox"),
getRuntimeContext().getMetricGroup().addGroup("functions"),
asyncOperationState);
//
// expire all the pending async operations.
//
AsyncOperationFailureNotifier.fireExpiredAsyncOperations(
asyncOperationStateDescriptor, reductions, getKeyedStateBackend());
}