in fetch_response.go [67:187]
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
metricRegistry := pd.metricRegistry()
var sizeMetric metrics.Histogram
if metricRegistry != nil {
sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
}
tmp, err := pd.getInt16()
if err != nil {
return err
}
b.Err = KError(tmp)
b.HighWaterMarkOffset, err = pd.getInt64()
if err != nil {
return err
}
if version >= 4 {
b.LastStableOffset, err = pd.getInt64()
if err != nil {
return err
}
if version >= 5 {
b.LogStartOffset, err = pd.getInt64()
if err != nil {
return err
}
}
numTransact, err := pd.getArrayLength()
if err != nil {
return err
}
if numTransact >= 0 {
b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
}
for i := 0; i < numTransact; i++ {
transact := new(AbortedTransaction)
if err = transact.decode(pd); err != nil {
return err
}
b.AbortedTransactions[i] = transact
}
}
if version >= 11 {
b.PreferredReadReplica, err = pd.getInt32()
if err != nil {
return err
}
} else {
b.PreferredReadReplica = -1
}
recordsSize, err := pd.getInt32()
if err != nil {
return err
}
if sizeMetric != nil {
sizeMetric.Update(int64(recordsSize))
}
recordsDecoder, err := pd.getSubset(int(recordsSize))
if err != nil {
return err
}
b.RecordsSet = []*Records{}
for recordsDecoder.remaining() > 0 {
records := &Records{}
if err := records.decode(recordsDecoder); err != nil {
// If we have at least one decoded records, this is not an error
if errors.Is(err, ErrInsufficientData) {
if len(b.RecordsSet) == 0 {
b.Partial = true
}
break
}
return err
}
b.LastRecordsBatchOffset, err = records.recordsOffset()
if err != nil {
return err
}
partial, err := records.isPartial()
if err != nil {
return err
}
n, err := records.numRecords()
if err != nil {
return err
}
if n > 0 || (partial && len(b.RecordsSet) == 0) {
b.RecordsSet = append(b.RecordsSet, records)
if b.Records == nil {
b.Records = records
}
}
overflow, err := records.isOverflow()
if err != nil {
return err
}
if partial || overflow {
break
}
}
return nil
}