in core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala [77:236]
private def doLoad(
tp: TopicPartition,
coordinator: CoordinatorPlayback[T],
future: CompletableFuture[LoadSummary],
startTimeMs: Long
): Unit = {
val schedulerQueueTimeMs = time.milliseconds() - startTimeMs
try {
replicaManager.getLog(tp) match {
case None =>
future.completeExceptionally(new NotLeaderOrFollowerException(
s"Could not load records from $tp because the log does not exist."))
case Some(log) =>
def logEndOffset: Long = replicaManager.getLogEndOffset(tp).getOrElse(-1L)
// Buffer may not be needed if records are read from memory.
var buffer = ByteBuffer.allocate(0)
// Loop breaks if leader changes at any time during the load, since logEndOffset is -1.
var currentOffset = log.logStartOffset
// Loop breaks if no records have been read, since the end of the log has been reached.
// This is to ensure that the loop breaks even if the current offset remains smaller than
// the log end offset but the log is empty. This could happen with compacted topics.
var readAtLeastOneRecord = true
var previousHighWatermark = -1L
var numRecords = 0L
var numBytes = 0L
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
val fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
val memoryRecords = (fetchDataInfo.records: @unchecked) match {
case records: MemoryRecords =>
records
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
// "minOneMessage = true in the above log.read() means that the buffer may need to
// be grown to ensure progress can be made.
if (buffer.capacity < bytesNeeded) {
if (loadBufferSize < bytesNeeded)
warn(s"Loaded metadata from $tp with buffer larger ($bytesNeeded bytes) than " +
s"configured buffer size ($loadBufferSize bytes).")
buffer = ByteBuffer.allocate(bytesNeeded)
} else {
buffer.clear()
}
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer)
}
memoryRecords.batches.forEach { batch =>
if (batch.isControlBatch) {
batch.asScala.foreach { record =>
val controlRecord = ControlRecordType.parse(record.key)
if (controlRecord == ControlRecordType.COMMIT) {
if (isTraceEnabled) {
trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to commit transaction " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
TransactionResult.COMMIT
)
} else if (controlRecord == ControlRecordType.ABORT) {
if (isTraceEnabled) {
trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to abort transaction " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
TransactionResult.ABORT
)
}
}
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
val coordinatorRecordOpt = {
try {
Some(deserializer.deserialize(record.key, record.value))
} catch {
case ex: UnknownRecordTypeException =>
warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " +
s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.")
None
case ex: RuntimeException =>
val msg = s"Deserializing record $record from $tp failed due to: ${ex.getMessage}"
error(s"$msg.")
throw new RuntimeException(msg, ex)
}
}
coordinatorRecordOpt.foreach { coordinatorRecord =>
try {
if (isTraceEnabled) {
trace(s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replay(
record.offset(),
batch.producerId,
batch.producerEpoch,
coordinatorRecord
)
} catch {
case ex: RuntimeException =>
val msg = s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch} " +
s"failed due to: ${ex.getMessage}"
error(s"$msg.")
throw new RuntimeException(msg, ex)
}
}
}
}
// Note that the high watermark can be greater than the current offset but as we load more records
// the current offset will eventually surpass the high watermark. Also note that the high watermark
// will continue to advance while loading.
currentOffset = batch.nextOffset
val currentHighWatermark = log.highWatermark
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset)
if (currentHighWatermark > previousHighWatermark) {
coordinator.updateLastCommittedOffset(currentHighWatermark)
previousHighWatermark = currentHighWatermark
}
}
}
numBytes = numBytes + memoryRecords.sizeInBytes()
}
val endTimeMs = time.milliseconds()
if (logEndOffset == -1L) {
future.completeExceptionally(new NotLeaderOrFollowerException(
s"Stopped loading records from $tp because the partition is not online or is no longer the leader."
))
} else if (isRunning.get) {
future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, numRecords, numBytes))
} else {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
}
} catch {
case ex: Throwable =>
future.completeExceptionally(ex)
}
}