protected void partitionConsumerRecordsHandler()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java [102:144]


    protected void partitionConsumerRecordsHandler(
            List<ConsumerRecord<byte[], byte[]>> partitionRecords,
            KafkaTopicPartitionState<T, TopicPartition> partition)
            throws Exception {

        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
            final KafkaShuffleElement element = kafkaShuffleDeserializer.deserialize(record);

            // TODO: Do we need to check the end of stream if reaching the end watermark
            // TODO: Currently, if one of the partition sends an end-of-stream signal the fetcher
            // stops running.
            // The current "ending of stream" logic in KafkaFetcher a bit strange: if any partition
            // has a record
            // signaled as "END_OF_STREAM", the fetcher will stop running. Notice that the signal is
            // coming from
            // the deserializer, which means from Kafka data itself. But it is possible that other
            // topics
            // and partitions still have data to read. Finishing reading Partition0 can not
            // guarantee that Partition1
            // also finishes.
            if (element.isRecord()) {
                // timestamp is inherent from upstream
                // If using ProcessTime, timestamp is going to be ignored (upstream does not include
                // timestamp as well)
                // If using IngestionTime, timestamp is going to be overwritten
                // If using EventTime, timestamp is going to be used
                synchronized (checkpointLock) {
                    KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
                    sourceContext.collectWithTimestamp(
                            elementAsRecord.value,
                            elementAsRecord.timestamp == null
                                    ? record.timestamp()
                                    : elementAsRecord.timestamp);
                    partition.setOffset(record.offset());
                }
            } else if (element.isWatermark()) {
                final KafkaShuffleWatermark watermark = element.asWatermark();
                Optional<Watermark> newWatermark =
                        watermarkHandler.checkAndGetNewWatermark(watermark);
                newWatermark.ifPresent(sourceContext::emitWatermark);
            }
        }
    }