metadata_response.go (441 lines of code) (raw):
package sarama
import "time"
// PartitionMetadata contains each partition in the topic.
type PartitionMetadata struct {
// Version defines the protocol version to use for encode and decode
Version int16
// Err contains the partition error, or 0 if there was no error.
Err KError
// ID contains the partition index.
ID int32
// Leader contains the ID of the leader broker.
Leader int32
// LeaderEpoch contains the leader epoch of this partition.
LeaderEpoch int32
// Replicas contains the set of all nodes that host this partition.
Replicas []int32
// Isr contains the set of nodes that are in sync with the leader for this partition.
Isr []int32
// OfflineReplicas contains the set of offline replicas of this partition.
OfflineReplicas []int32
}
func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
p.Version = version
tmp, err := pd.getInt16()
if err != nil {
return err
}
p.Err = KError(tmp)
if p.ID, err = pd.getInt32(); err != nil {
return err
}
if p.Leader, err = pd.getInt32(); err != nil {
return err
}
if p.Version >= 7 {
if p.LeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}
if p.Version < 9 {
p.Replicas, err = pd.getInt32Array()
} else {
p.Replicas, err = pd.getCompactInt32Array()
}
if err != nil {
return err
}
if p.Version < 9 {
p.Isr, err = pd.getInt32Array()
} else {
p.Isr, err = pd.getCompactInt32Array()
}
if err != nil {
return err
}
if p.Version >= 5 {
if p.Version < 9 {
p.OfflineReplicas, err = pd.getInt32Array()
} else {
p.OfflineReplicas, err = pd.getCompactInt32Array()
}
if err != nil {
return err
}
}
if p.Version >= 9 {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}
return nil
}
func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
p.Version = version
pe.putInt16(int16(p.Err))
pe.putInt32(p.ID)
pe.putInt32(p.Leader)
if p.Version >= 7 {
pe.putInt32(p.LeaderEpoch)
}
if p.Version < 9 {
err = pe.putInt32Array(p.Replicas)
} else {
err = pe.putCompactInt32Array(p.Replicas)
}
if err != nil {
return err
}
if p.Version < 9 {
err = pe.putInt32Array(p.Isr)
} else {
err = pe.putCompactInt32Array(p.Isr)
}
if err != nil {
return err
}
if p.Version >= 5 {
if p.Version < 9 {
err = pe.putInt32Array(p.OfflineReplicas)
} else {
err = pe.putCompactInt32Array(p.OfflineReplicas)
}
if err != nil {
return err
}
}
if p.Version >= 9 {
pe.putEmptyTaggedFieldArray()
}
return nil
}
// TopicMetadata contains each topic in the response.
type TopicMetadata struct {
// Version defines the protocol version to use for encode and decode
Version int16
// Err contains the topic error, or 0 if there was no error.
Err KError
// Name contains the topic name.
Name string
Uuid Uuid
// IsInternal contains a True if the topic is internal.
IsInternal bool
// Partitions contains each partition in the topic.
Partitions []*PartitionMetadata
TopicAuthorizedOperations int32 // Only valid for Version >= 8
}
func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
t.Version = version
tmp, err := pd.getInt16()
if err != nil {
return err
}
t.Err = KError(tmp)
if t.Version < 9 {
t.Name, err = pd.getString()
} else {
t.Name, err = pd.getCompactString()
}
if err != nil {
return err
}
if t.Version >= 10 {
uuid, err := pd.getRawBytes(16)
if err != nil {
return err
}
t.Uuid = [16]byte{}
for i := 0; i < 16; i++ {
t.Uuid[i] = uuid[i]
}
}
if t.Version >= 1 {
t.IsInternal, err = pd.getBool()
if err != nil {
return err
}
}
var n int
if t.Version < 9 {
n, err = pd.getArrayLength()
} else {
n, err = pd.getCompactArrayLength()
}
if err != nil {
return err
} else {
t.Partitions = make([]*PartitionMetadata, n)
for i := 0; i < n; i++ {
block := &PartitionMetadata{}
if err := block.decode(pd, t.Version); err != nil {
return err
}
t.Partitions[i] = block
}
}
if t.Version >= 8 {
t.TopicAuthorizedOperations, err = pd.getInt32()
if err != nil {
return err
}
}
if t.Version >= 9 {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}
return nil
}
func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
t.Version = version
pe.putInt16(int16(t.Err))
if t.Version < 9 {
err = pe.putString(t.Name)
} else {
err = pe.putCompactString(t.Name)
}
if err != nil {
return err
}
if t.Version >= 10 {
err = pe.putRawBytes(t.Uuid[:])
if err != nil {
return err
}
}
if t.Version >= 1 {
pe.putBool(t.IsInternal)
}
if t.Version < 9 {
err = pe.putArrayLength(len(t.Partitions))
if err != nil {
return err
}
} else {
pe.putCompactArrayLength(len(t.Partitions))
}
for _, block := range t.Partitions {
if err := block.encode(pe, t.Version); err != nil {
return err
}
}
if t.Version >= 8 {
pe.putInt32(t.TopicAuthorizedOperations)
}
if t.Version >= 9 {
pe.putEmptyTaggedFieldArray()
}
return nil
}
type MetadataResponse struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
ThrottleTimeMs int32
// Brokers contains each broker in the response.
Brokers []*Broker
// ClusterID contains the cluster ID that responding broker belongs to.
ClusterID *string
// ControllerID contains the ID of the controller broker.
ControllerID int32
// Topics contains each topic in the response.
Topics []*TopicMetadata
ClusterAuthorizedOperations int32 // Only valid for Version >= 8
}
func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.Version >= 3 {
if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
return err
}
}
var brokerArrayLen int
if r.Version < 9 {
brokerArrayLen, err = pd.getArrayLength()
} else {
brokerArrayLen, err = pd.getCompactArrayLength()
}
if err != nil {
return err
}
r.Brokers = make([]*Broker, brokerArrayLen)
for i := 0; i < brokerArrayLen; i++ {
r.Brokers[i] = new(Broker)
err = r.Brokers[i].decode(pd, version)
if err != nil {
return err
}
}
if r.Version >= 2 {
if r.Version < 9 {
r.ClusterID, err = pd.getNullableString()
} else {
r.ClusterID, err = pd.getCompactNullableString()
}
if err != nil {
return err
}
}
if r.Version >= 1 {
if r.ControllerID, err = pd.getInt32(); err != nil {
return err
}
}
var topicArrayLen int
if version < 9 {
topicArrayLen, err = pd.getArrayLength()
} else {
topicArrayLen, err = pd.getCompactArrayLength()
}
if err != nil {
return err
}
r.Topics = make([]*TopicMetadata, topicArrayLen)
for i := 0; i < topicArrayLen; i++ {
r.Topics[i] = new(TopicMetadata)
err = r.Topics[i].decode(pd, version)
if err != nil {
return err
}
}
if r.Version >= 8 {
r.ClusterAuthorizedOperations, err = pd.getInt32()
if err != nil {
return err
}
}
if r.Version >= 9 {
_, err := pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}
return nil
}
func (r *MetadataResponse) encode(pe packetEncoder) (err error) {
if r.Version >= 3 {
pe.putInt32(r.ThrottleTimeMs)
}
if r.Version < 9 {
err = pe.putArrayLength(len(r.Brokers))
if err != nil {
return err
}
} else {
pe.putCompactArrayLength(len(r.Brokers))
}
for _, broker := range r.Brokers {
err = broker.encode(pe, r.Version)
if err != nil {
return err
}
}
if r.Version >= 2 {
if r.Version < 9 {
err = pe.putNullableString(r.ClusterID)
if err != nil {
return err
}
} else {
err = pe.putNullableCompactString(r.ClusterID)
if err != nil {
return err
}
}
}
if r.Version >= 1 {
pe.putInt32(r.ControllerID)
}
if r.Version < 9 {
err = pe.putArrayLength(len(r.Topics))
} else {
pe.putCompactArrayLength(len(r.Topics))
}
if err != nil {
return err
}
for _, block := range r.Topics {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}
if r.Version >= 8 {
pe.putInt32(r.ClusterAuthorizedOperations)
}
if r.Version >= 9 {
pe.putEmptyTaggedFieldArray()
}
return nil
}
func (r *MetadataResponse) key() int16 {
return 3
}
func (r *MetadataResponse) version() int16 {
return r.Version
}
func (r *MetadataResponse) headerVersion() int16 {
if r.Version < 9 {
return 0
} else {
return 1
}
}
func (r *MetadataResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 7
}
func (r *MetadataResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 10:
return V2_8_0_0
case 9:
return V2_4_0_0
case 8:
return V2_3_0_0
case 7:
return V2_1_0_0
case 6:
return V2_0_0_0
case 5:
return V1_0_0_0
case 3, 4:
return V0_11_0_0
case 2:
return V0_10_1_0
case 1:
return V0_10_0_0
case 0:
return V0_8_2_0
default:
return V2_8_0_0
}
}
func (r *MetadataResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}
// testing API
func (r *MetadataResponse) AddBroker(addr string, id int32) {
r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
}
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
var tmatch *TopicMetadata
for _, tm := range r.Topics {
if tm.Name == topic {
tmatch = tm
goto foundTopic
}
}
tmatch = new(TopicMetadata)
tmatch.Name = topic
r.Topics = append(r.Topics, tmatch)
foundTopic:
tmatch.Err = err
return tmatch
}
func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
tmatch := r.AddTopic(topic, ErrNoError)
var pmatch *PartitionMetadata
for _, pm := range tmatch.Partitions {
if pm.ID == partition {
pmatch = pm
goto foundPartition
}
}
pmatch = new(PartitionMetadata)
pmatch.ID = partition
tmatch.Partitions = append(tmatch.Partitions, pmatch)
foundPartition:
pmatch.Leader = brokerID
pmatch.Replicas = replicas
if pmatch.Replicas == nil {
pmatch.Replicas = []int32{}
}
pmatch.Isr = isr
if pmatch.Isr == nil {
pmatch.Isr = []int32{}
}
pmatch.OfflineReplicas = offline
if pmatch.OfflineReplicas == nil {
pmatch.OfflineReplicas = []int32{}
}
pmatch.Err = err
}