in record_batch.go [102:196]
func (b *RecordBatch) decode(pd packetDecoder) (err error) {
if b.FirstOffset, err = pd.getInt64(); err != nil {
return err
}
batchLen, err := pd.getInt32()
if err != nil {
return err
}
if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
if b.Version, err = pd.getInt8(); err != nil {
return err
}
crc32Decoder := acquireCrc32Field(crcCastagnoli)
defer releaseCrc32Field(crc32Decoder)
if err = pd.push(crc32Decoder); err != nil {
return err
}
attributes, err := pd.getInt16()
if err != nil {
return err
}
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask
b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
}
if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
return err
}
if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
return err
}
if b.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if b.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}
if b.FirstSequence, err = pd.getInt32(); err != nil {
return err
}
numRecs, err := pd.getArrayLength()
if err != nil {
return err
}
if numRecs >= 0 {
b.Records = make([]*Record, numRecs)
}
bufSize := int(batchLen) - recordBatchOverhead
recBuffer, err := pd.getRawBytes(bufSize)
if err != nil {
if errors.Is(err, ErrInsufficientData) {
b.PartialTrailingRecord = true
b.Records = nil
return nil
}
return err
}
if err = pd.pop(); err != nil {
return err
}
recBuffer, err = decompress(b.Codec, recBuffer)
if err != nil {
return err
}
b.recordsLen = len(recBuffer)
err = decode(recBuffer, recordsArray(b.Records), nil)
if errors.Is(err, ErrInsufficientData) {
b.PartialTrailingRecord = true
b.Records = nil
return nil
}
return err
}