bool ProducerImpl::ackReceived()

in lib/ProducerImpl.cc [872:923]


bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
    auto messageId = MessageIdBuilder::from(rawMessageId).partition(partition_).build();
    Lock lock(mutex_);

    if (pendingMessagesQueue_.empty()) {
        LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]"  //
                            << " -- MessageId - " << messageId << "]"
                            << "Got an SEND_ACK for expired message, ignoring it.");
        return true;
    }

    OpSendMsg op = pendingMessagesQueue_.front();
    uint64_t expectedSequenceId = op.sequenceId_;
    if (sequenceId > expectedSequenceId) {
        LOG_WARN(getName() << "Got ack for msg " << sequenceId                        //
                           << " expecting: " << expectedSequenceId << " queue size="  //
                           << pendingMessagesQueue_.size() << " producer: " << producerId_);
        return false;
    } else if (sequenceId < expectedSequenceId) {
        // Ignoring the ack since it's referring to a message that has already timed out.
        LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId  //
                            << " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
                            << " producer: " << producerId_);
        return true;
    }

    // Message was persisted correctly
    LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);

    if (op.chunkedMessageId_) {
        // Handling the chunk message id.
        if (op.metadata_.chunk_id() == 0) {
            op.chunkedMessageId_->setFirstChunkMessageId(messageId);
        } else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) {
            op.chunkedMessageId_->setLastChunkMessageId(messageId);
            messageId = op.chunkedMessageId_->build();
        }
    }

    releaseSemaphoreForSendOp(op);
    lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;

    pendingMessagesQueue_.pop_front();

    lock.unlock();
    try {
        op.complete(ResultOk, messageId);
    } catch (const std::exception& e) {
        LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
    }
    return true;
}