in produce_request.go [68:139]
func (r *ProduceRequest) encode(pe packetEncoder) error {
if r.Version >= 3 {
if err := pe.putNullableString(r.TransactionalID); err != nil {
return err
}
}
pe.putInt16(int16(r.RequiredAcks))
pe.putInt32(r.Timeout)
metricRegistry := pe.metricRegistry()
var batchSizeMetric metrics.Histogram
var compressionRatioMetric metrics.Histogram
if metricRegistry != nil {
batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
}
totalRecordCount := int64(0)
err := pe.putArrayLength(len(r.records))
if err != nil {
return err
}
for topic, partitions := range r.records {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
if err != nil {
return err
}
topicRecordCount := int64(0)
var topicCompressionRatioMetric metrics.Histogram
if metricRegistry != nil {
topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
}
for id, records := range partitions {
startOffset := pe.offset()
pe.putInt32(id)
pe.push(&lengthField{})
err = records.encode(pe)
if err != nil {
return err
}
err = pe.pop()
if err != nil {
return err
}
if metricRegistry != nil {
if r.Version >= 3 {
topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric)
} else {
topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric)
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
}
}
if topicRecordCount > 0 {
getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
totalRecordCount += topicRecordCount
}
}
if totalRecordCount > 0 {
metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
}
return nil
}