in consumer.go [685:830]
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
var consumerBatchSizeMetric metrics.Histogram
if child.consumer != nil && child.consumer.metricRegistry != nil {
consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", child.consumer.metricRegistry)
}
// If request was throttled and empty we log and return without error
if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
Logger.Printf(
"consumer/broker/%d FetchResponse throttled %v\n",
child.broker.broker.ID(), response.ThrottleTime)
return nil, nil
}
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
}
if !errors.Is(block.Err, ErrNoError) {
return nil, block.Err
}
nRecs, err := block.numRecords()
if err != nil {
return nil, err
}
if consumerBatchSizeMetric != nil {
consumerBatchSizeMetric.Update(int64(nRecs))
}
if block.PreferredReadReplica != invalidPreferredReplicaID {
child.preferredReadReplica = block.PreferredReadReplica
}
if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
return nil, err
}
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
// check int32 overflow
if child.fetchSize < 0 {
child.fetchSize = math.MaxInt32
}
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
} else if block.LastRecordsBatchOffset != nil && *block.LastRecordsBatchOffset < block.HighWaterMarkOffset {
// check last record offset to avoid stuck if high watermark was not reached
Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.LastRecordsBatchOffset)
child.offset = *block.LastRecordsBatchOffset + 1
}
return nil, nil
}
// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
abortedTransactions := block.getAbortedTransactions()
var messages []*ConsumerMessage
for _, records := range block.RecordsSet {
switch records.recordsType {
case legacyRecords:
messageSetMessages, err := child.parseMessages(records.MsgSet)
if err != nil {
return nil, err
}
messages = append(messages, messageSetMessages...)
case defaultRecords:
// Consume remaining abortedTransaction up to last offset of current batch
for _, txn := range abortedTransactions {
if txn.FirstOffset > records.RecordBatch.LastOffset() {
break
}
abortedProducerIDs[txn.ProducerID] = struct{}{}
// Pop abortedTransactions so that we never add it again
abortedTransactions = abortedTransactions[1:]
}
recordBatchMessages, err := child.parseRecords(records.RecordBatch)
if err != nil {
return nil, err
}
// Parse and commit offset but do not expose messages that are:
// - control records
// - part of an aborted transaction when set to `ReadCommitted`
// control record
isControl, err := records.isControl()
if err != nil {
// I don't know why there is this continue in case of error to begin with
// Safe bet is to ignore control messages if ReadUncommitted
// and block on them in case of error and ReadCommitted
if child.conf.Consumer.IsolationLevel == ReadCommitted {
return nil, err
}
continue
}
if isControl {
controlRecord, err := records.getControlRecord()
if err != nil {
return nil, err
}
if controlRecord.Type == ControlRecordAbort {
delete(abortedProducerIDs, records.RecordBatch.ProducerID)
}
continue
}
// filter aborted transactions
if child.conf.Consumer.IsolationLevel == ReadCommitted {
_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
if records.RecordBatch.IsTransactional && isAborted {
continue
}
}
messages = append(messages, recordBatchMessages...)
default:
return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
}
}
return messages, nil
}