in lib/ProducerImpl.cc [872:923]
bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
auto messageId = MessageIdBuilder::from(rawMessageId).partition(partition_).build();
Lock lock(mutex_);
if (pendingMessagesQueue_.empty()) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< " -- MessageId - " << messageId << "]"
<< "Got an SEND_ACK for expired message, ignoring it.");
return true;
}
OpSendMsg op = pendingMessagesQueue_.front();
uint64_t expectedSequenceId = op.sequenceId_;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size=" //
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
<< " producer: " << producerId_);
return true;
}
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
if (op.chunkedMessageId_) {
// Handling the chunk message id.
if (op.metadata_.chunk_id() == 0) {
op.chunkedMessageId_->setFirstChunkMessageId(messageId);
} else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) {
op.chunkedMessageId_->setLastChunkMessageId(messageId);
messageId = op.chunkedMessageId_->build();
}
}
releaseSemaphoreForSendOp(op);
lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
pendingMessagesQueue_.pop_front();
lock.unlock();
try {
op.complete(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
return true;
}