func()

in fetch_response.go [67:187]


func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
	metricRegistry := pd.metricRegistry()
	var sizeMetric metrics.Histogram
	if metricRegistry != nil {
		sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
	}

	tmp, err := pd.getInt16()
	if err != nil {
		return err
	}
	b.Err = KError(tmp)

	b.HighWaterMarkOffset, err = pd.getInt64()
	if err != nil {
		return err
	}

	if version >= 4 {
		b.LastStableOffset, err = pd.getInt64()
		if err != nil {
			return err
		}

		if version >= 5 {
			b.LogStartOffset, err = pd.getInt64()
			if err != nil {
				return err
			}
		}

		numTransact, err := pd.getArrayLength()
		if err != nil {
			return err
		}

		if numTransact >= 0 {
			b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
		}

		for i := 0; i < numTransact; i++ {
			transact := new(AbortedTransaction)
			if err = transact.decode(pd); err != nil {
				return err
			}
			b.AbortedTransactions[i] = transact
		}
	}

	if version >= 11 {
		b.PreferredReadReplica, err = pd.getInt32()
		if err != nil {
			return err
		}
	} else {
		b.PreferredReadReplica = -1
	}

	recordsSize, err := pd.getInt32()
	if err != nil {
		return err
	}
	if sizeMetric != nil {
		sizeMetric.Update(int64(recordsSize))
	}

	recordsDecoder, err := pd.getSubset(int(recordsSize))
	if err != nil {
		return err
	}

	b.RecordsSet = []*Records{}

	for recordsDecoder.remaining() > 0 {
		records := &Records{}
		if err := records.decode(recordsDecoder); err != nil {
			// If we have at least one decoded records, this is not an error
			if errors.Is(err, ErrInsufficientData) {
				if len(b.RecordsSet) == 0 {
					b.Partial = true
				}
				break
			}
			return err
		}

		b.LastRecordsBatchOffset, err = records.recordsOffset()
		if err != nil {
			return err
		}

		partial, err := records.isPartial()
		if err != nil {
			return err
		}

		n, err := records.numRecords()
		if err != nil {
			return err
		}

		if n > 0 || (partial && len(b.RecordsSet) == 0) {
			b.RecordsSet = append(b.RecordsSet, records)

			if b.Records == nil {
				b.Records = records
			}
		}

		overflow, err := records.isOverflow()
		if err != nil {
			return err
		}

		if partial || overflow {
			break
		}
	}

	return nil
}