func()

in fetch_request.go [99:164]


func (r *FetchRequest) encode(pe packetEncoder) (err error) {
	metricRegistry := pe.metricRegistry()

	pe.putInt32(-1) // ReplicaID is always -1 for clients
	pe.putInt32(r.MaxWaitTime)
	pe.putInt32(r.MinBytes)
	if r.Version >= 3 {
		pe.putInt32(r.MaxBytes)
	}
	if r.Version >= 4 {
		pe.putInt8(int8(r.Isolation))
	}
	if r.Version >= 7 {
		pe.putInt32(r.SessionID)
		pe.putInt32(r.SessionEpoch)
	}
	err = pe.putArrayLength(len(r.blocks))
	if err != nil {
		return err
	}
	for topic, blocks := range r.blocks {
		err = pe.putString(topic)
		if err != nil {
			return err
		}
		err = pe.putArrayLength(len(blocks))
		if err != nil {
			return err
		}
		for partition, block := range blocks {
			pe.putInt32(partition)
			err = block.encode(pe, r.Version)
			if err != nil {
				return err
			}
		}
		getOrRegisterTopicMeter("consumer-fetch-rate", topic, metricRegistry).Mark(1)
	}
	if r.Version >= 7 {
		err = pe.putArrayLength(len(r.forgotten))
		if err != nil {
			return err
		}
		for topic, partitions := range r.forgotten {
			err = pe.putString(topic)
			if err != nil {
				return err
			}
			err = pe.putArrayLength(len(partitions))
			if err != nil {
				return err
			}
			for _, partition := range partitions {
				pe.putInt32(partition)
			}
		}
	}
	if r.Version >= 11 {
		err = pe.putString(r.RackID)
		if err != nil {
			return err
		}
	}

	return nil
}