public void initializeState()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [1153:1223]


    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (semantic != FlinkKafkaProducer.Semantic.NONE
                && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.warn(
                    "Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.",
                    semantic,
                    FlinkKafkaProducer.Semantic.NONE);
            semantic = FlinkKafkaProducer.Semantic.NONE;
        }

        nextTransactionalIdHintState =
                context.getOperatorStateStore()
                        .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);

        if (context.getOperatorStateStore()
                .getRegisteredStateNames()
                .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
            migrateNextTransactionalIdHindState(context);
        }

        String actualTransactionalIdPrefix;
        if (this.transactionalIdPrefix != null) {
            actualTransactionalIdPrefix = this.transactionalIdPrefix;
        } else {
            String taskName = getRuntimeContext().getTaskName();
            // Kafka transactional IDs are limited in length to be less than the max value of
            // a short, so we truncate here if necessary to a more reasonable length string.
            if (taskName.length() > maxTaskNameSize) {
                taskName = taskName.substring(0, maxTaskNameSize);
                LOG.warn(
                        "Truncated task name for Kafka TransactionalId from {} to {}.",
                        getRuntimeContext().getTaskName(),
                        taskName);
            }
            actualTransactionalIdPrefix =
                    taskName
                            + "-"
                            + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID();
        }
        transactionalIdsGenerator =
                new TransactionalIdsGenerator(
                        actualTransactionalIdPrefix,
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks(),
                        kafkaProducersPoolSize,
                        SAFE_SCALE_DOWN_FACTOR);

        if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
            nextTransactionalIdHint = null;
        } else {
            ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
                    Lists.newArrayList(nextTransactionalIdHintState.get());
            if (transactionalIdHints.size() > 1) {
                throw new IllegalStateException(
                        "There should be at most one next transactional id hint written by the first subtask");
            } else if (transactionalIdHints.size() == 0) {
                nextTransactionalIdHint = new FlinkKafkaProducer.NextTransactionalIdHint(0, 0);

                // this means that this is either:
                // (1) the first execution of this application
                // (2) previous execution has failed before first checkpoint completed
                //
                // in case of (2) we have to abort all previous transactions
                abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
            } else {
                nextTransactionalIdHint = transactionalIdHints.get(0);
            }
        }

        super.initializeState(context);
    }