in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [757:830]
public void run(SourceContext<T> sourceContext) throws Exception {
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
// initialize commit metrics and default offset callback method
this.successfulCommits =
this.getRuntimeContext()
.getMetricGroup()
.counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits =
this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
this.offsetCommitCallback =
new KafkaCommitCallback() {
@Override
public void onSuccess() {
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
LOG.warn(
String.format(
"Consumer subtask %d failed async Kafka commit.",
subtaskIndex),
cause);
failedCommits.inc();
}
};
// mark the subtask as temporarily idle if there are no initial seed partitions;
// once this subtask discovers some partitions and starts collecting records, the subtask's
// status will automatically be triggered back to be active.
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
LOG.info(
"Consumer subtask {} creating fetcher with offsets {}.",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets);
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Kafka through the fetcher, if configured to do so)
this.kafkaFetcher =
createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
watermarkStrategy,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
// 1) New state - partition discovery loop executed as separate thread, with this
// thread running the main fetcher loop
// 2) Old state - partition discovery is disabled and only the main fetcher loop is
// executed
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}