in clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java [1222:1348]
private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetch) {
TopicPartition tp = nextCompletedFetch.partition;
FetchResponse.PartitionData<Records> partition = nextCompletedFetch.partitionData;
long fetchOffset = nextCompletedFetch.nextFetchOffset;
CompletedFetch completedFetch = null;
Errors error = partition.error();
try {
if (!subscriptions.hasValidPosition(tp)) {
// this can happen when a rebalance happened while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
} else if (error == Errors.NONE) {
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
FetchPosition position = subscriptions.position(tp);
if (position == null || position.offset != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
partition.records().sizeInBytes(), tp, position);
Iterator<? extends RecordBatch> batches = partition.records().batches().iterator();
completedFetch = nextCompletedFetch;
if (!batches.hasNext() && partition.records().sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark() >= 0) {
log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
subscriptions.updateHighWatermark(tp, partition.highWatermark());
}
if (partition.logStartOffset() >= 0) {
log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
}
if (partition.lastStableOffset() >= 0) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
}
if (partition.preferredReadReplica().isPresent()) {
subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica().get(), () -> {
long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
tp, partition.preferredReadReplica().get(), expireTimeMs);
return expireTimeMs;
});
}
nextCompletedFetch.initialized = true;
} else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
FetchPosition position = subscriptions.position(tp);
if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
"does not match the current offset {}", tp, fetchOffset, position);
} else {
handleOffsetOutOfRange(position, tp);
}
} else {
log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
//we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
log.warn("Not authorized to read from partition {}.", tp);
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
log.debug("Received unknown leader epoch error in fetch for partition {}", tp);
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown server error while fetching offset {} for topic-partition {}",
fetchOffset, tp);
} else if (error == Errors.CORRUPT_MESSAGE) {
throw new KafkaException("Encountered corrupt message when fetching offset "
+ fetchOffset
+ " for topic-partition "
+ tp);
} else {
throw new IllegalStateException("Unexpected error code "
+ error.code()
+ " while fetching at offset "
+ fetchOffset
+ " from topic-partition " + tp);
}
} finally {
if (completedFetch == null)
nextCompletedFetch.metricAggregator.record(tp, 0, 0);
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
}
return completedFetch;
}