in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [1153:1223]
public void initializeState(FunctionInitializationContext context) throws Exception {
if (semantic != FlinkKafkaProducer.Semantic.NONE
&& !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn(
"Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.",
semantic,
FlinkKafkaProducer.Semantic.NONE);
semantic = FlinkKafkaProducer.Semantic.NONE;
}
nextTransactionalIdHintState =
context.getOperatorStateStore()
.getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
if (context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
migrateNextTransactionalIdHindState(context);
}
String actualTransactionalIdPrefix;
if (this.transactionalIdPrefix != null) {
actualTransactionalIdPrefix = this.transactionalIdPrefix;
} else {
String taskName = getRuntimeContext().getTaskName();
// Kafka transactional IDs are limited in length to be less than the max value of
// a short, so we truncate here if necessary to a more reasonable length string.
if (taskName.length() > maxTaskNameSize) {
taskName = taskName.substring(0, maxTaskNameSize);
LOG.warn(
"Truncated task name for Kafka TransactionalId from {} to {}.",
getRuntimeContext().getTaskName(),
taskName);
}
actualTransactionalIdPrefix =
taskName
+ "-"
+ ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID();
}
transactionalIdsGenerator =
new TransactionalIdsGenerator(
actualTransactionalIdPrefix,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
} else {
ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
"There should be at most one next transactional id hint written by the first subtask");
} else if (transactionalIdHints.size() == 0) {
nextTransactionalIdHint = new FlinkKafkaProducer.NextTransactionalIdHint(0, 0);
// this means that this is either:
// (1) the first execution of this application
// (2) previous execution has failed before first checkpoint completed
//
// in case of (2) we have to abort all previous transactions
abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
} else {
nextTransactionalIdHint = transactionalIdHints.get(0);
}
}
super.initializeState(context);
}