in produce_set.go [35:128]
func (ps *produceSet) add(msg *ProducerMessage) error {
var err error
var key, val []byte
if msg.Key != nil {
if key, err = msg.Key.Encode(); err != nil {
return err
}
}
if msg.Value != nil {
if val, err = msg.Value.Encode(); err != nil {
return err
}
}
timestamp := msg.Timestamp
if timestamp.IsZero() {
timestamp = time.Now()
}
timestamp = timestamp.Truncate(time.Millisecond)
partitions := ps.msgs[msg.Topic]
if partitions == nil {
partitions = make(map[int32]*partitionSet)
ps.msgs[msg.Topic] = partitions
}
var size int
set := partitions[msg.Partition]
if set == nil {
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
batch := &RecordBatch{
FirstTimestamp: timestamp,
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
}
set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
size = recordBatchOverhead
} else {
set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
}
partitions[msg.Partition] = set
}
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
return errors.New("assertion failed: message out of sequence added to a batch")
}
}
// Past this point we can't return an error, because we've already added the message to the set.
set.msgs = append(set.msgs, msg)
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
Key: key,
Value: val,
TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
}
size += len(key) + len(val)
if len(msg.Headers) > 0 {
rec.Headers = make([]*RecordHeader, len(msg.Headers))
for i := range msg.Headers {
rec.Headers[i] = &msg.Headers[i]
size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
}
}
set.recordsToSend.RecordBatch.addRecord(rec)
} else {
msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
msgToSend.Timestamp = timestamp
msgToSend.Version = 1
}
set.recordsToSend.MsgSet.addMessage(msgToSend)
size = producerMessageOverhead + len(key) + len(val)
}
set.bufferBytes += size
ps.bufferBytes += size
ps.bufferCount++
return nil
}