private def dumpLog()

in core/src/main/scala/kafka/tools/DumpLogSegments.scala [262:352]


  private def dumpLog(file: File,
                      printContents: Boolean,
                      nonConsecutivePairsForLogFilesMap: mutable.Map[String, List[(Long, Long)]],
                      isDeepIteration: Boolean,
                      parser: MessageParser[_, _],
                      skipRecordMetadata: Boolean,
                      maxBytes: Int): Unit = {
    if (file.getName.endsWith(UnifiedLog.LOG_FILE_SUFFIX)) {
      val startOffset = file.getName.split("\\.")(0).toLong
      println(s"Log starting offset: $startOffset")
    } else if (file.getName.endsWith(Snapshots.SUFFIX)) {
      if (file.getName == BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME) {
        println("KRaft bootstrap snapshot")
      } else {
        val path = Snapshots.parse(file.toPath).get()
        println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: ${path.snapshotId.epoch}")
      }
    }
    val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
    try {
      var validBytes = 0L
      var lastOffset = -1L

      for (batch <- fileRecords.batches.asScala) {
        printBatchLevel(batch, validBytes)
        if (isDeepIteration) {
          for (record <- batch.asScala) {
            if (lastOffset == -1)
              lastOffset = record.offset
            else if (record.offset != lastOffset + 1) {
              var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
              nonConsecutivePairsSeq ::= (lastOffset, record.offset)
              nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
            }
            lastOffset = record.offset

            var prefix = s"$RecordIndent "
            if (!skipRecordMetadata) {
              print(s"${prefix}offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " +
                s"keySize: ${record.keySize} valueSize: ${record.valueSize}")
              prefix = " "

              if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
                print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
              }
              record match {
                case r: AbstractLegacyRecordBatch => print(s" isValid: ${r.isValid} crc: ${r.checksum}}")
                case _ =>
              }

              if (batch.isControlBatch) {
                val controlTypeId = ControlRecordType.parseTypeId(record.key)
                ControlRecordType.fromTypeId(controlTypeId) match {
                  case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
                    val endTxnMarker = EndTransactionMarker.deserialize(record)
                    print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
                  case ControlRecordType.SNAPSHOT_HEADER =>
                    val header = ControlRecordUtils.deserializeSnapshotHeaderRecord(record)
                    print(s" SnapshotHeader ${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
                  case ControlRecordType.SNAPSHOT_FOOTER =>
                    val footer = ControlRecordUtils.deserializeSnapshotFooterRecord(record)
                    print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
                  case ControlRecordType.KRAFT_VERSION =>
                    val kraftVersion = ControlRecordUtils.deserializeKRaftVersionRecord(record)
                    print(s" KRaftVersion ${KRaftVersionRecordJsonConverter.write(kraftVersion, kraftVersion.version())}")
                  case ControlRecordType.KRAFT_VOTERS=>
                    val voters = ControlRecordUtils.deserializeVotersRecord(record)
                    print(s" KRaftVoters ${VotersRecordJsonConverter.write(voters, voters.version())}")
                  case controlType =>
                    print(s" controlType: $controlType($controlTypeId)")
                }
              }
            }
            if (printContents && !batch.isControlBatch) {
              val (key, payload) = parser.parse(record)
              key.foreach { key =>
                print(s"${prefix}key: $key")
                prefix = " "
              }
              payload.foreach(payload => print(s" payload: $payload"))
            }
            println()
          }
        }
        validBytes += batch.sizeInBytes
      }
      val trailingBytes = fileRecords.sizeInBytes - validBytes
      if ( (trailingBytes > 0) && (maxBytes == Integer.MAX_VALUE) )
        println(s"Found $trailingBytes invalid bytes at the end of ${file.getName}")
    } finally fileRecords.closeHandlers()
  }