in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java [102:144]
protected void partitionConsumerRecordsHandler(
List<ConsumerRecord<byte[], byte[]>> partitionRecords,
KafkaTopicPartitionState<T, TopicPartition> partition)
throws Exception {
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final KafkaShuffleElement element = kafkaShuffleDeserializer.deserialize(record);
// TODO: Do we need to check the end of stream if reaching the end watermark
// TODO: Currently, if one of the partition sends an end-of-stream signal the fetcher
// stops running.
// The current "ending of stream" logic in KafkaFetcher a bit strange: if any partition
// has a record
// signaled as "END_OF_STREAM", the fetcher will stop running. Notice that the signal is
// coming from
// the deserializer, which means from Kafka data itself. But it is possible that other
// topics
// and partitions still have data to read. Finishing reading Partition0 can not
// guarantee that Partition1
// also finishes.
if (element.isRecord()) {
// timestamp is inherent from upstream
// If using ProcessTime, timestamp is going to be ignored (upstream does not include
// timestamp as well)
// If using IngestionTime, timestamp is going to be overwritten
// If using EventTime, timestamp is going to be used
synchronized (checkpointLock) {
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
sourceContext.collectWithTimestamp(
elementAsRecord.value,
elementAsRecord.timestamp == null
? record.timestamp()
: elementAsRecord.timestamp);
partition.setOffset(record.offset());
}
} else if (element.isWatermark()) {
final KafkaShuffleWatermark watermark = element.asWatermark();
Optional<Watermark> newWatermark =
watermarkHandler.checkAndGetNewWatermark(watermark);
newWatermark.ifPresent(sourceContext::emitWatermark);
}
}
}