in offset_fetch_request.go [47:118]
func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 7 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}
isFlexible := r.Version >= 6
if isFlexible {
err = pe.putCompactString(r.ConsumerGroup)
} else {
err = pe.putString(r.ConsumerGroup)
}
if err != nil {
return err
}
if isFlexible {
if r.partitions == nil {
pe.putUVarint(0)
} else {
pe.putCompactArrayLength(len(r.partitions))
}
} else {
if r.partitions == nil && r.Version >= 2 {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
}
}
for topic, partitions := range r.partitions {
if isFlexible {
err = pe.putCompactString(topic)
} else {
err = pe.putString(topic)
}
if err != nil {
return err
}
//
if isFlexible {
err = pe.putCompactInt32Array(partitions)
} else {
err = pe.putInt32Array(partitions)
}
if err != nil {
return err
}
if isFlexible {
pe.putEmptyTaggedFieldArray()
}
}
if r.RequireStable && r.Version < 7 {
return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
}
if r.Version >= 7 {
pe.putBool(r.RequireStable)
}
if isFlexible {
pe.putEmptyTaggedFieldArray()
}
return nil
}