in core/src/main/scala/kafka/tools/DumpLogSegments.scala [262:352]
private def dumpLog(file: File,
printContents: Boolean,
nonConsecutivePairsForLogFilesMap: mutable.Map[String, List[(Long, Long)]],
isDeepIteration: Boolean,
parser: MessageParser[_, _],
skipRecordMetadata: Boolean,
maxBytes: Int): Unit = {
if (file.getName.endsWith(UnifiedLog.LOG_FILE_SUFFIX)) {
val startOffset = file.getName.split("\\.")(0).toLong
println(s"Log starting offset: $startOffset")
} else if (file.getName.endsWith(Snapshots.SUFFIX)) {
if (file.getName == BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME) {
println("KRaft bootstrap snapshot")
} else {
val path = Snapshots.parse(file.toPath).get()
println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: ${path.snapshotId.epoch}")
}
}
val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
try {
var validBytes = 0L
var lastOffset = -1L
for (batch <- fileRecords.batches.asScala) {
printBatchLevel(batch, validBytes)
if (isDeepIteration) {
for (record <- batch.asScala) {
if (lastOffset == -1)
lastOffset = record.offset
else if (record.offset != lastOffset + 1) {
var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
nonConsecutivePairsSeq ::= (lastOffset, record.offset)
nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
}
lastOffset = record.offset
var prefix = s"$RecordIndent "
if (!skipRecordMetadata) {
print(s"${prefix}offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " +
s"keySize: ${record.keySize} valueSize: ${record.valueSize}")
prefix = " "
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
}
record match {
case r: AbstractLegacyRecordBatch => print(s" isValid: ${r.isValid} crc: ${r.checksum}}")
case _ =>
}
if (batch.isControlBatch) {
val controlTypeId = ControlRecordType.parseTypeId(record.key)
ControlRecordType.fromTypeId(controlTypeId) match {
case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
val endTxnMarker = EndTransactionMarker.deserialize(record)
print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
case ControlRecordType.SNAPSHOT_HEADER =>
val header = ControlRecordUtils.deserializeSnapshotHeaderRecord(record)
print(s" SnapshotHeader ${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
case ControlRecordType.SNAPSHOT_FOOTER =>
val footer = ControlRecordUtils.deserializeSnapshotFooterRecord(record)
print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
case ControlRecordType.KRAFT_VERSION =>
val kraftVersion = ControlRecordUtils.deserializeKRaftVersionRecord(record)
print(s" KRaftVersion ${KRaftVersionRecordJsonConverter.write(kraftVersion, kraftVersion.version())}")
case ControlRecordType.KRAFT_VOTERS=>
val voters = ControlRecordUtils.deserializeVotersRecord(record)
print(s" KRaftVoters ${VotersRecordJsonConverter.write(voters, voters.version())}")
case controlType =>
print(s" controlType: $controlType($controlTypeId)")
}
}
}
if (printContents && !batch.isControlBatch) {
val (key, payload) = parser.parse(record)
key.foreach { key =>
print(s"${prefix}key: $key")
prefix = " "
}
payload.foreach(payload => print(s" payload: $payload"))
}
println()
}
}
validBytes += batch.sizeInBytes
}
val trailingBytes = fileRecords.sizeInBytes - validBytes
if ( (trailingBytes > 0) && (maxBytes == Integer.MAX_VALUE) )
println(s"Found $trailingBytes invalid bytes at the end of ${file.getName}")
} finally fileRecords.closeHandlers()
}