in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java [142:214]
protected AbstractFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics)
throws Exception {
this.sourceContext = checkNotNull(sourceContext);
this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext);
this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
this.checkpointLock = sourceContext.getCheckpointLock();
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
this.useMetrics = useMetrics;
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
this.legacyCurrentOffsetsMetricGroup =
consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
this.legacyCommittedOffsetsMetricGroup =
consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);
this.watermarkStrategy = watermarkStrategy;
if (watermarkStrategy == null) {
timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
} else {
timestampWatermarkMode = WITH_WATERMARK_GENERATOR;
}
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates =
createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarkStrategy,
userCodeClassLoader);
// check that all seed partition states have a defined offset
for (KafkaTopicPartitionState<?, ?> partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException(
"The fetcher was assigned seed partitions with undefined initial offsets.");
}
}
// all seed partitions are not assigned yet, so should be added to the unassigned partitions
// queue
for (KafkaTopicPartitionState<T, KPH> partition : subscribedPartitionStates) {
unassignedPartitionsQueue.add(partition);
}
// register metrics for the initial seed partitions
if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
}
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T, KPH> periodicEmitter =
new PeriodicWatermarkEmitter<>(
checkpointLock,
subscribedPartitionStates,
watermarkOutputMultiplexer,
processingTimeProvider,
autoWatermarkInterval);
periodicEmitter.start();
}
}