in produce_request.go [141:201]
func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
r.Version = version
if version >= 3 {
id, err := pd.getNullableString()
if err != nil {
return err
}
r.TransactionalID = id
}
requiredAcks, err := pd.getInt16()
if err != nil {
return err
}
r.RequiredAcks = RequiredAcks(requiredAcks)
if r.Timeout, err = pd.getInt32(); err != nil {
return err
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
}
if topicCount == 0 {
return nil
}
r.records = make(map[string]map[int32]Records)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.records[topic] = make(map[int32]Records)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
size, err := pd.getInt32()
if err != nil {
return err
}
recordsDecoder, err := pd.getSubset(int(size))
if err != nil {
return err
}
var records Records
if err := records.decode(recordsDecoder); err != nil {
return err
}
r.records[topic][partition] = records
}
}
return nil
}