func()

in offset_commit_request.go [130:190]


func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
	r.Version = version

	if r.ConsumerGroup, err = pd.getString(); err != nil {
		return err
	}

	if r.Version >= 1 {
		if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
			return err
		}
		if r.ConsumerID, err = pd.getString(); err != nil {
			return err
		}
	}

	// Version 5 removes RetentionTime, which is now controlled only by a broker configuration.
	if r.Version >= 2 && r.Version <= 4 {
		if r.RetentionTime, err = pd.getInt64(); err != nil {
			return err
		}
	}

	if r.Version >= 7 {
		if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
			return err
		}
	}

	topicCount, err := pd.getArrayLength()
	if err != nil {
		return err
	}
	if topicCount == 0 {
		return nil
	}
	r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
	for i := 0; i < topicCount; i++ {
		topic, err := pd.getString()
		if err != nil {
			return err
		}
		partitionCount, err := pd.getArrayLength()
		if err != nil {
			return err
		}
		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
		for j := 0; j < partitionCount; j++ {
			partition, err := pd.getInt32()
			if err != nil {
				return err
			}
			block := &offsetCommitRequestBlock{}
			if err := block.decode(pd, r.Version); err != nil {
				return err
			}
			r.blocks[topic][partition] = block
		}
	}
	return nil
}