func()

in offset_fetch_request.go [47:118]


func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
	if r.Version < 0 || r.Version > 7 {
		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
	}

	isFlexible := r.Version >= 6

	if isFlexible {
		err = pe.putCompactString(r.ConsumerGroup)
	} else {
		err = pe.putString(r.ConsumerGroup)
	}
	if err != nil {
		return err
	}

	if isFlexible {
		if r.partitions == nil {
			pe.putUVarint(0)
		} else {
			pe.putCompactArrayLength(len(r.partitions))
		}
	} else {
		if r.partitions == nil && r.Version >= 2 {
			pe.putInt32(-1)
		} else {
			if err = pe.putArrayLength(len(r.partitions)); err != nil {
				return err
			}
		}
	}

	for topic, partitions := range r.partitions {
		if isFlexible {
			err = pe.putCompactString(topic)
		} else {
			err = pe.putString(topic)
		}
		if err != nil {
			return err
		}

		//

		if isFlexible {
			err = pe.putCompactInt32Array(partitions)
		} else {
			err = pe.putInt32Array(partitions)
		}
		if err != nil {
			return err
		}

		if isFlexible {
			pe.putEmptyTaggedFieldArray()
		}
	}

	if r.RequireStable && r.Version < 7 {
		return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
	}

	if r.Version >= 7 {
		pe.putBool(r.RequireStable)
	}

	if isFlexible {
		pe.putEmptyTaggedFieldArray()
	}

	return nil
}