in lib/ProducerImpl.cc [907:964]
bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
auto messageId = MessageIdBuilder::from(rawMessageId).partition(partition_).build();
Lock lock(mutex_);
if (pendingMessagesQueue_.empty()) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< " -- MessageId - " << messageId << "]"
<< "Got an SEND_ACK for expired message, ignoring it.");
return true;
}
auto& op = *pendingMessagesQueue_.front();
if (op.result != ResultOk) {
LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
<< rawMessageId);
return false;
}
uint64_t expectedSequenceId = op.sendArgs->sequenceId;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
<< " expecting: " << expectedSequenceId << " queue size=" //
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
return false;
} else if (sequenceId < expectedSequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
<< " producer: " << producerId_);
return true;
}
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
if (op.chunkMessageIdList) {
// Handling the chunk message id.
op.chunkMessageIdList->push_back(messageId);
if (op.chunkId == op.numChunks - 1) {
auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
messageId = chunkedMessageId->build();
}
}
releaseSemaphoreForSendOp(op);
lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
std::unique_ptr<OpSendMsg> opSendMsg{pendingMessagesQueue_.front().release()};
pendingMessagesQueue_.pop_front();
lock.unlock();
try {
opSendMsg->complete(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
return true;
}