bool ProducerImpl::ackReceived()

in lib/ProducerImpl.cc [907:964]


bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
    auto messageId = MessageIdBuilder::from(rawMessageId).partition(partition_).build();
    Lock lock(mutex_);

    if (pendingMessagesQueue_.empty()) {
        LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]"  //
                            << " -- MessageId - " << messageId << "]"
                            << "Got an SEND_ACK for expired message, ignoring it.");
        return true;
    }

    auto& op = *pendingMessagesQueue_.front();
    if (op.result != ResultOk) {
        LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
                                                          << rawMessageId);
        return false;
    }

    uint64_t expectedSequenceId = op.sendArgs->sequenceId;
    if (sequenceId > expectedSequenceId) {
        LOG_WARN(getName() << "Got ack for msg " << sequenceId                        //
                           << " expecting: " << expectedSequenceId << " queue size="  //
                           << pendingMessagesQueue_.size() << " producer: " << producerId_);
        return false;
    } else if (sequenceId < expectedSequenceId) {
        // Ignoring the ack since it's referring to a message that has already timed out.
        LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId  //
                            << " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
                            << " producer: " << producerId_);
        return true;
    }

    // Message was persisted correctly
    LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);

    if (op.chunkMessageIdList) {
        // Handling the chunk message id.
        op.chunkMessageIdList->push_back(messageId);
        if (op.chunkId == op.numChunks - 1) {
            auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
            messageId = chunkedMessageId->build();
        }
    }

    releaseSemaphoreForSendOp(op);
    lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;

    std::unique_ptr<OpSendMsg> opSendMsg{pendingMessagesQueue_.front().release()};
    pendingMessagesQueue_.pop_front();

    lock.unlock();
    try {
        opSendMsg->complete(ResultOk, messageId);
    } catch (const std::exception& e) {
        LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
    }
    return true;
}