lib/ConsumerImpl.cc (1,477 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "ConsumerImpl.h" #include <pulsar/DeadLetterPolicyBuilder.h> #include <pulsar/MessageIdBuilder.h> #include <algorithm> #include "AckGroupingTracker.h" #include "AckGroupingTrackerDisabled.h" #include "AckGroupingTrackerEnabled.h" #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" #include "BitSet.h" #include "ChunkMessageIdImpl.h" #include "ClientConnection.h" #include "ClientImpl.h" #include "Commands.h" #include "ExecutorService.h" #include "GetLastMessageIdResponse.h" #include "LogUtils.h" #include "MessageCrypto.h" #include "MessageIdUtil.h" #include "MessageImpl.h" #include "MessagesImpl.h" #include "ProducerConfigurationImpl.h" #include "PulsarApi.pb.h" #include "TimeUtils.h" #include "TopicName.h" #include "UnAckedMessageTrackerDisabled.h" #include "UnAckedMessageTrackerEnabled.h" #include "Utils.h" #include "stats/ConsumerStatsDisabled.h" #include "stats/ConsumerStatsImpl.h" namespace pulsar { DECLARE_LOG_OBJECT() ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, bool isPersistent, const ConsumerInterceptorsPtr& interceptors, const ExecutorServicePtr listenerExecutor /* = NULL by default */, bool hasParent /* = false by default */, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, Commands::SubscriptionMode subscriptionMode, boost::optional<MessageId> startMessageId) : ConsumerImplBase( client, topic, Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()), milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()), milliseconds(0)), conf, listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()), waitingForZeroQueueSizeMessage(false), config_(conf), subscription_(subscriptionName), originalSubscriptionName_(subscriptionName), isPersistent_(isPersistent), messageListener_(config_.getMessageListener()), eventListener_(config_.getConsumerEventListener()), hasParent_(hasParent), consumerTopicType_(consumerTopicType), subscriptionMode_(subscriptionMode), // This is the initial capacity of the queue incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)), availablePermits_(0), receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2), consumerId_(client->newConsumerId()), consumerName_(config_.getConsumerName()), messageListenerRunning_(true), negativeAcksTracker_(client, *this, conf), readCompacted_(conf.isReadCompacted()), startMessageId_(startMessageId), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), interceptors_(interceptors) { std::stringstream consumerStrStream; consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); // Initialize un-ACKed messages OT tracker. if (conf.getUnAckedMessagesTimeoutMs() != 0) { if (conf.getTickDurationInMs() > 0) { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); } else { unAckedMessageTrackerPtr_.reset( new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } // Setup stats reporter. unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds(); if (statsIntervalInSeconds) { consumerStatsBasePtr_ = std::make_shared<ConsumerStatsImpl>( consumerStr_, client->getIOExecutorProvider()->get(), statsIntervalInSeconds); } else { consumerStatsBasePtr_ = std::make_shared<ConsumerStatsDisabled>(); } consumerStatsBasePtr_->start(); // Create msgCrypto if (conf.isEncryptionEnabled()) { msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false); } // Config dlq auto deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy.getMaxRedeliverCount() > 0) { auto deadLetterPolicyBuilder = DeadLetterPolicyBuilder() .maxRedeliverCount(deadLetterPolicy.getMaxRedeliverCount()) .initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName()); if (deadLetterPolicy.getDeadLetterTopic().empty()) { deadLetterPolicyBuilder.deadLetterTopic(topic + "-" + subscriptionName + DLQ_GROUP_TOPIC_SUFFIX); } else { deadLetterPolicyBuilder.deadLetterTopic(deadLetterPolicy.getDeadLetterTopic()); } deadLetterPolicy_ = deadLetterPolicyBuilder.build(); } checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); } ConsumerImpl::~ConsumerImpl() { LOG_DEBUG(getName() << "~ConsumerImpl"); if (state_ == Ready) { // this could happen at least in this condition: // consumer seek, caused reconnection, if consumer close happened before connection ready, // then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in // broker. LOG_WARN(getName() << "Destroyed consumer which was not properly closed"); ClientConnectionPtr cnx = getCnx().lock(); ClientImplPtr client = client_.lock(); if (client && cnx) { int requestId = client->newRequestId(); cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); cnx->removeConsumer(consumerId_); LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_); } else { LOG_WARN(getName() << "Client is destroyed and cannot send the CloseConsumer command"); } } shutdown(); } void ConsumerImpl::setPartitionIndex(int partitionIndex) { partitionIndex_ = partitionIndex; } int ConsumerImpl::getPartitionIndex() { return partitionIndex_; } uint64_t ConsumerImpl::getConsumerId() { return consumerId_; } Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture() { return consumerCreatedPromise_.getFuture(); } const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; } const std::string& ConsumerImpl::getTopic() const { return *topic_; } void ConsumerImpl::start() { HandlerBase::start(); std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()}; auto connectionSupplier = [weakSelf]() -> ClientConnectionPtr { auto self = weakSelf.lock(); if (!self) { return nullptr; } return self->getCnx().lock(); }; // NOTE: start() is always called in `ClientImpl`'s method, so lock() returns not null const auto requestIdGenerator = client_.lock()->getRequestIdGenerator(); const auto requestIdSupplier = [requestIdGenerator] { return (*requestIdGenerator)++; }; // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the // constructor completed. if (TopicName::get(*topic_)->isPersistent()) { if (config_.getAckGroupingTimeMs() > 0) { ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled( connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(), config_.getAckGroupingTimeMs(), config_.getAckGroupingMaxSize(), client_.lock()->getIOExecutorProvider()->get())); } else { ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled( connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled())); } } else { LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic."); ackGroupingTrackerPtr_.reset(new AckGroupingTracker(connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled())); } ackGroupingTrackerPtr_->start(); } void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { cnx.removeConsumer(consumerId_); } void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) { interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds); } void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); return; } // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. cnx->registerConsumer(consumerId_, get_shared_this_ptr()); if (duringSeek_) { ackGroupingTrackerPtr_->flushAndClean(); } Lock lockForMessageId(mutexForMessageId_); // Update startMessageId so that we can discard messages after delivery restarts const auto startMessageId = clearReceiveQueue(); const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none; startMessageId_ = startMessageId; lockForMessageId.unlock(); unAckedMessageTrackerPtr_->clear(); ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe( *topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(), config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(), config_.getPriorityLevel()); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx, std::placeholders::_1)); } void ConsumerImpl::connectionFailed(Result result) { // Keep a reference to ensure object is kept alive auto ptr = get_shared_this_ptr(); if (consumerCreatedPromise_.setFailed(result)) { state_ = Failed; } } void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) { if (cnx && numMessages > 0) { LOG_DEBUG(getName() << "Send more permits: " << numMessages); SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages)); cnx->sendCommand(cmd); } } void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { static bool firstTime = true; if (result == ResultOk) { if (firstTime) { firstTime = false; } LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); { Lock lock(mutex_); setCnx(cnx); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); state_ = Ready; backoff_.reset(); // Complicated logic since we don't have a isLocked() function for mutex if (waitingForZeroQueueSizeMessage) { sendFlowPermitsToBroker(cnx, 1); } availablePermits_ = 0; } LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize()); if (consumerTopicType_ == NonPartitioned || !firstTime) { if (config_.getReceiverQueueSize() != 0) { sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize()); } else if (messageListener_) { sendFlowPermitsToBroker(cnx, 1); } } consumerCreatedPromise_.setValue(get_shared_this_ptr()); } else { if (result == ResultTimeout) { // Creating the consumer has timed out. We need to ensure the broker closes the consumer // in case it was indeed created, otherwise it might prevent new subscribe operation, // since we are not closing the connection int requestId = client_.lock()->newRequestId(); cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); } if (consumerCreatedPromise_.isComplete()) { // Consumer had already been initially created, we need to retry connecting in any case LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result)); scheduleReconnection(get_shared_this_ptr()); } else { // Consumer was not yet created, retry to connect to broker if it's possible result = convertToTimeoutIfNecessary(result, creationTimestamp_); if (result == ResultRetryable) { LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result)); scheduleReconnection(get_shared_this_ptr()); } else { LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); consumerCreatedPromise_.setFailed(result); state_ = Failed; } } } } void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { LOG_INFO(getName() << "Unsubscribing"); auto callback = [this, originalCallback](Result result) { if (result == ResultOk) { shutdown(); LOG_INFO(getName() << "Unsubscribed successfully"); } else { state_ = Ready; LOG_WARN(getName() << "Failed to unsubscribe: " << result); } if (originalCallback) { originalCallback(result); } }; if (state_ != Ready) { callback(ResultAlreadyClosed); return; } Lock lock(mutex_); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_); ClientImplPtr client = client_.lock(); lock.unlock(); int requestId = client->newRequestId(); SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); auto self = get_shared_this_ptr(); cnx->sendRequestWithId(cmd, requestId) .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } else { Result result = ResultNotConnected; lock.unlock(); LOG_WARN(getName() << "Failed to unsubscribe: " << strResult(result)); callback(result); } } void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck) { if (autoAck) { acknowledgeAsync(messageId, [uuid, messageId](Result result) { if (result != ResultOk) { LOG_WARN("Failed to acknowledge discarded chunk, uuid: " << uuid << ", messageId: " << messageId); } }); } else { trackMessage(messageId); } } void ConsumerImpl::triggerCheckExpiredChunkedTimer() { checkExpiredChunkedTimer_->expires_from_now( boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()}; checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void { auto self = weakSelf.lock(); if (!self) { return; } if (ec) { LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec << "]."); return; } Lock lock(chunkProcessMutex_); long currentTimeMs = TimeUtils::currentTimeMillis(); chunkedMessageCache_.removeOldestValuesIf( [this, currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { bool expired = currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; if (!expired) { return false; } for (const MessageId& msgId : ctx.getChunkedMessageIds()) { LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId); discardChunkMessages(uuid, msgId, true); } return true; }); triggerCheckExpiredChunkedTimer(); return; }); } boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, const proto::MessageIdData& messageIdData, const ClientConnectionPtr& cnx, MessageId& messageId) { const auto chunkId = metadata.chunk_id(); const auto uuid = metadata.uuid(); LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid << ", messageId: " << messageId << ") of " << payload.readableBytes() << " bytes"); Lock lock(chunkProcessMutex_); // Lazy task scheduling to expire incomplete chunk message bool expected = false; if (expireTimeOfIncompleteChunkedMessageMs_ > 0 && expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) { triggerCheckExpiredChunkedTimer(); } auto it = chunkedMessageCache_.find(uuid); if (chunkId == 0 && it == chunkedMessageCache_.end()) { if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) { chunkedMessageCache_.removeOldestValues( chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1, [this](const std::string& uuid, const ChunkedMessageCtx& ctx) { for (const MessageId& msgId : ctx.getChunkedMessageIds()) { discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_); } }); } it = chunkedMessageCache_.putIfAbsent( uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); } auto& chunkedMsgCtx = it->second; if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) { if (it == chunkedMessageCache_.end()) { LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); } else { LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: " << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); chunkedMessageCache_.remove(uuid); } lock.unlock(); increaseAvailablePermits(cnx); trackMessage(messageId); return boost::none; } chunkedMsgCtx.appendChunk(messageId, payload); if (!chunkedMsgCtx.isCompleted()) { lock.unlock(); increaseAvailablePermits(cnx); return boost::none; } ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(); chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); messageId = chunkMsgId->build(); LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); auto wholePayload = chunkedMsgCtx.getBuffer(); chunkedMessageCache_.remove(uuid); if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) { return wholePayload; } else { return boost::none; } } void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata, proto::MessageMetadata& metadata, SharedBuffer& payload) { LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes()); if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) { // Message was discarded or not consumed due to decryption failure return; } if (!isChecksumValid) { // Message discarded for checksum error discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch); return; } auto redeliveryCount = msg.redelivery_count(); const bool isMessageUndecryptable = metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() && config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME; const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1; if (!isMessageUndecryptable && !isChunkedMessage) { if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) { // Message was discarded on decompression error return; } } const auto& messageIdData = msg.message_id(); auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build(); // Only a non-batched messages can be a chunk if (!metadata.has_num_messages_in_batch() && isChunkedMessage) { auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId); if (optionalPayload) { payload = optionalPayload.value(); } else { return; } } Message m(messageId, brokerEntryMetadata, metadata, payload); m.impl_->cnx_ = cnx.get(); m.impl_->setTopicName(topic_); m.impl_->setRedeliveryCount(msg.redelivery_count()); if (metadata.has_schema_version()) { m.impl_->setSchemaVersion(metadata.schema_version()); } LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch()); LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = " << metadata.has_num_messages_in_batch()); uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch(); if (this->ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) { LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by same consumer."); increaseAvailablePermits(cnx, numOfMessageReceived); return; } if (metadata.has_num_messages_in_batch()) { BitSet::Data words(msg.ack_set_size()); for (int i = 0; i < words.size(); i++) { words[i] = msg.ack_set(i); } BitSet ackSet{std::move(words)}; Lock lock(mutex_); numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count()); } else { // try convert key value data. m.impl_->convertPayloadToKeyValue(config_.getSchema()); const auto startMessageId = startMessageId_.get(); if (isPersistent_ && startMessageId && m.getMessageId().ledgerId() == startMessageId.value().ledgerId() && m.getMessageId().entryId() == startMessageId.value().entryId() && isPriorEntryIndex(m.getMessageId().entryId())) { LOG_DEBUG(getName() << " Ignoring message from before the startMessageId: " << startMessageId.value()); return; } if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m}); if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { redeliverUnacknowledgedMessages({m.getMessageId()}); increaseAvailablePermits(cnx); return; } } executeNotifyCallback(m); } if (messageListener_) { if (!messageListenerRunning_) { return; } // Trigger message listener callback in a separate thread while (numOfMessageReceived--) { listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); } } } void ConsumerImpl::activeConsumerChanged(bool isActive) { if (eventListener_) { listenerExecutor_->postWork( std::bind(&ConsumerImpl::internalConsumerChangeListener, get_shared_this_ptr(), isActive)); } } void ConsumerImpl::internalConsumerChangeListener(bool isActive) { try { if (isActive) { eventListener_->becameActive(Consumer(get_shared_this_ptr()), partitionIndex_); } else { eventListener_->becameInactive(Consumer(get_shared_this_ptr()), partitionIndex_); } } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from event listener " << e.what()); } } void ConsumerImpl::failPendingReceiveCallback() { Message msg; Lock lock(pendingReceiveMutex_); while (!pendingReceives_.empty()) { ReceiveCallback callback = pendingReceives_.front(); pendingReceives_.pop(); listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, get_shared_this_ptr(), ResultAlreadyClosed, msg, callback)); } lock.unlock(); } void ConsumerImpl::executeNotifyCallback(Message& msg) { Lock lock(pendingReceiveMutex_); // if asyncReceive is waiting then notify callback without adding to incomingMessages queue bool asyncReceivedWaiting = !pendingReceives_.empty(); ReceiveCallback callback; if (asyncReceivedWaiting) { callback = pendingReceives_.front(); pendingReceives_.pop(); } lock.unlock(); // has pending receive, direct callback. if (asyncReceivedWaiting) { listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, get_shared_this_ptr(), ResultOk, msg, callback)); return; } // try to add incoming messages. // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message` if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) { incomingMessages_.push(msg); incomingMessagesSize_.fetch_add(msg.getLength()); } // try trigger pending batch messages Lock batchOptionLock(batchReceiveOptionMutex_); if (hasEnoughMessagesForBatchReceive()) { ConsumerImplBase::notifyBatchPendingReceivedCallback(); } } void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) { auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(), batchReceivePolicy_.getMaxNumBytes()); Message msg; while (incomingMessages_.popIf( msg, [&messages](const Message& peekMsg) { return messages->canAdd(peekMsg); })) { messageProcessed(msg); Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); messages->add(interceptMsg); } auto self = get_shared_this_ptr(); listenerExecutor_->postWork( [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); }); } void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg, const ReceiveCallback& callback) { if (result == ResultOk && config_.getReceiverQueueSize() != 0) { messageProcessed(msg); msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); unAckedMessageTrackerPtr_->add(msg.getMessageId()); } callback(result, msg); } // Zero Queue size is not supported with Batch Messages uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage, const BitSet& ackSet, int redeliveryCount) { auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch(); LOG_DEBUG("Received Batch messages of size - " << batchSize << " -- msgId: " << batchedMessage.getMessageId()); const auto startMessageId = startMessageId_.get(); int skippedMessages = 0; auto acker = BatchMessageAckerImpl::create(batchSize); std::vector<Message> possibleToDeadLetter; for (int i = 0; i < batchSize; i++) { // This is a cheap copy since message contains only one shared pointer (impl_) Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker); msg.impl_->setRedeliveryCount(redeliveryCount); msg.impl_->setTopicName(batchedMessage.impl_->topicName_); msg.impl_->convertPayloadToKeyValue(config_.getSchema()); if (msg.impl_->brokerEntryMetadata.has_index()) { msg.impl_->brokerEntryMetadata.set_index(msg.impl_->brokerEntryMetadata.index() - batchSize + i + 1); } if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { possibleToDeadLetter.emplace_back(msg); if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { skippedMessages++; continue; } } if (startMessageId) { const MessageId& msgId = msg.getMessageId(); // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId if (isPersistent_ && msgId.ledgerId() == startMessageId.value().ledgerId() && msgId.entryId() == startMessageId.value().entryId() && isPriorBatchIndex(msgId.batchIndex())) { LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); ++skippedMessages; continue; } } if (!ackSet.isEmpty() && !ackSet.get(i)) { LOG_DEBUG(getName() << "Ignoring message from " << i << "th message, which has been acknowledged"); ++skippedMessages; continue; } executeNotifyCallback(msg); } if (!possibleToDeadLetter.empty()) { possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter); if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { redeliverUnacknowledgedMessages({batchedMessage.getMessageId()}); } } if (skippedMessages > 0) { increaseAvailablePermits(cnx, skippedMessages); } return batchSize - skippedMessages; } bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, const proto::MessageMetadata& metadata, SharedBuffer& payload) { if (!metadata.encryption_keys_size()) { return true; } // If KeyReader is not configured throw exception based on config param if (!config_.isEncryptionEnabled()) { if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) { LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message."); return true; } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) { LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config " "is set to discard"); discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError); } else { LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to " "consume encrypted message"); auto messageId = MessageIdBuilder::from(msg.message_id()).build(); unAckedMessageTrackerPtr_->add(messageId); } return false; } SharedBuffer decryptedPayload; if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) { payload = decryptedPayload; return true; } if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) { // Note, batch message will fail to consume even if config is set to consume LOG_WARN( getName() << "Decryption failed. Consuming encrypted message since config is set to consume."); return true; } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) { LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard"); discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError); } else { LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message"); auto messageId = MessageIdBuilder::from(msg.message_id()).build(); unAckedMessageTrackerPtr_->add(messageId); } return false; } bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData, const proto::MessageMetadata& metadata, SharedBuffer& payload, bool checkMaxMessageSize) { if (!metadata.has_compression()) { return true; } CompressionType compressionType = static_cast<CompressionType>(metadata.compression()); uint32_t uncompressedSize = metadata.uncompressed_size(); uint32_t payloadSize = payload.readableBytes(); if (cnx) { if (checkMaxMessageSize && payloadSize > ClientConnection::getMaxMessageSize()) { // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize // << " at " << messageIdData.ledgerid() << ":" << messageIdData.entryid()); discardCorruptedMessage(cnx, messageIdData, CommandAck_ValidationError_UncompressedSizeCorruption); return false; } } else { LOG_ERROR("Connection not ready for Consumer - " << getConsumerId()); return false; } if (!CompressionCodecProvider::getCodec(compressionType).decode(payload, uncompressedSize, payload)) { LOG_ERROR(getName() << "Failed to decompress message with " << uncompressedSize // << " at " << messageIdData.ledgerid() << ":" << messageIdData.entryid()); discardCorruptedMessage(cnx, messageIdData, CommandAck_ValidationError_DecompressionError); return false; } return true; } void ConsumerImpl::discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId, CommandAck_ValidationError validationError) { LOG_ERROR(getName() << "Discarding corrupted message at " << messageId.ledgerid() << ":" << messageId.entryid()); SharedBuffer cmd = Commands::newAck(consumerId_, messageId.ledgerid(), messageId.entryid(), {}, CommandAck_AckType_Individual, validationError); cnx->sendCommand(cmd); increaseAvailablePermits(cnx); } void ConsumerImpl::internalListener() { if (!messageListenerRunning_) { return; } Message msg; if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { // This will only happen when the connection got reset and we cleared the queue return; } trackMessage(msg.getMessageId()); try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); lastDequedMessageId_ = msg.getMessageId(); Consumer consumer{get_shared_this_ptr()}; Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); messageListener_(consumer, interceptMsg); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); } messageProcessed(msg, false); } Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) { if (config_.getReceiverQueueSize() != 0) { LOG_ERROR(getName() << " Can't use receiveForZeroQueueSize if the queue size is not 0"); return ResultInvalidConfiguration; } // Using RAII for locking ClientConnectionPtr currentCnx = getCnx().lock(); Lock lock(mutexForReceiveWithZeroQueueSize); // Just being cautious if (incomingMessages_.size() != 0) { LOG_ERROR( getName() << "The incoming message queue should never be greater than 0 when Queue size is 0"); incomingMessages_.clear(); } waitingForZeroQueueSizeMessage = true; sendFlowPermitsToBroker(currentCnx, 1); while (true) { if (!incomingMessages_.pop(msg)) { return ResultInterrupted; } { // Lock needed to prevent race between connectionOpened and the check "msg.impl_->cnx_ == // currentCnx.get())" Lock localLock(mutex_); // if message received due to an old flow - discard it and wait for the message from the // latest flow command if (msg.impl_->cnx_ == currentCnx.get()) { waitingForZeroQueueSizeMessage = false; // Can't use break here else it may trigger a race with connection opened. localLock.unlock(); msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); return ResultOk; } } } } Result ConsumerImpl::receive(Message& msg) { Result res = receiveHelper(msg); consumerStatsBasePtr_->receivedMessage(msg, res); return res; } void ConsumerImpl::receiveAsync(ReceiveCallback callback) { Message msg; // fail the callback if consumer is closing or closed if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } Lock lock(pendingReceiveMutex_); if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { lock.unlock(); messageProcessed(msg); msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); callback(ResultOk, msg); } else { pendingReceives_.push(callback); lock.unlock(); if (config_.getReceiverQueueSize() == 0) { sendFlowPermitsToBroker(getCnx().lock(), 1); } } } Result ConsumerImpl::receiveHelper(Message& msg) { if (state_ != Ready) { return ResultAlreadyClosed; } if (messageListener_) { LOG_ERROR(getName() << "Can not receive when a listener has been set"); return ResultInvalidConfiguration; } if (config_.getReceiverQueueSize() == 0) { return fetchSingleMessageFromBroker(msg); } if (!incomingMessages_.pop(msg)) { return ResultInterrupted; } messageProcessed(msg); msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); return ResultOk; } Result ConsumerImpl::receive(Message& msg, int timeout) { Result res = receiveHelper(msg, timeout); consumerStatsBasePtr_->receivedMessage(msg, res); return res; } Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { if (config_.getReceiverQueueSize() == 0) { LOG_WARN(getName() << "Can't use this function if the queue size is 0"); return ResultInvalidConfiguration; } if (state_ != Ready) { return ResultAlreadyClosed; } if (messageListener_) { LOG_ERROR(getName() << "Can not receive when a listener has been set"); return ResultInvalidConfiguration; } if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) { messageProcessed(msg); msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); return ResultOk; } else { if (state_ != Ready) { return ResultAlreadyClosed; } return ResultTimeout; } } void ConsumerImpl::messageProcessed(Message& msg, bool track) { Lock lock(mutexForMessageId_); lastDequedMessageId_ = msg.getMessageId(); lock.unlock(); incomingMessagesSize_.fetch_sub(msg.getLength()); ClientConnectionPtr currentCnx = getCnx().lock(); if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { LOG_DEBUG(getName() << "Not adding permit since connection is different."); return; } increaseAvailablePermits(currentCnx); if (track) { trackMessage(msg.getMessageId()); } } /** * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that * was * not seen by the application */ boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() { bool expectedDuringSeek = true; if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) { return seekMessageId_.get(); } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { return startMessageId_.get(); } Message nextMessageInQueue; if (incomingMessages_.peekAndClear(nextMessageInQueue)) { // There was at least one message pending in the queue const MessageId& nextMessageId = nextMessageInQueue.getMessageId(); auto previousMessageId = (nextMessageId.batchIndex() >= 0) ? MessageIdBuilder() .ledgerId(nextMessageId.ledgerId()) .entryId(nextMessageId.entryId()) .batchIndex(nextMessageId.batchIndex() - 1) .batchSize(nextMessageId.batchSize()) .build() : MessageIdBuilder() .ledgerId(nextMessageId.ledgerId()) .entryId(nextMessageId.entryId() - 1) .build(); return previousMessageId; } else if (lastDequedMessageId_ != MessageId::earliest()) { // If the queue was empty we need to restart from the message just after the last one that has been // dequeued // in the past return lastDequedMessageId_; } else { // No message was received or dequeued by this consumer. Next message would still be the // startMessageId return startMessageId_.get(); } } void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta) { int newAvailablePermits = availablePermits_.fetch_add(delta) + delta; while (newAvailablePermits >= receiverQueueRefillThreshold_ && messageListenerRunning_) { if (availablePermits_.compare_exchange_weak(newAvailablePermits, 0)) { sendFlowPermitsToBroker(currentCnx, newAvailablePermits); break; } } } inline CommandSubscribe_SubType ConsumerImpl::getSubType() { ConsumerType type = config_.getConsumerType(); switch (type) { case ConsumerExclusive: return CommandSubscribe_SubType_Exclusive; case ConsumerShared: return CommandSubscribe_SubType_Shared; case ConsumerFailover: return CommandSubscribe_SubType_Failover; case ConsumerKeyShared: return CommandSubscribe_SubType_Key_Shared; } BOOST_THROW_EXCEPTION(std::logic_error("Invalid ConsumerType enumeration value")); } inline CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() { InitialPosition initialPosition = config_.getSubscriptionInitialPosition(); switch (initialPosition) { case InitialPositionLatest: return CommandSubscribe_InitialPosition_Latest; case InitialPositionEarliest: return CommandSubscribe_InitialPosition_Earliest; } BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition enumeration value")); } void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { auto pair = prepareIndividualAck(msgId); const auto& msgIdToAck = pair.first; const bool readyToAck = pair.second; if (readyToAck) { ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck, callback); } else { if (callback) { callback(ResultOk); } } interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, msgId); } void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) { MessageIdList messageIdListToAck; // TODO: Need to check if the consumer is ready. Same to all other public methods for (auto&& msgId : messageIdList) { auto pair = prepareIndividualAck(msgId); const auto& msgIdToAck = pair.first; const bool readyToAck = pair.second; if (readyToAck) { messageIdListToAck.emplace_back(msgIdToAck); } // Invoking `onAcknowledge` for all message ids no matter if it's ready to ack. This is consistent // with the Java client. interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, msgId); } this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck, callback); } void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) { interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), ResultCumulativeAcknowledgementNotAllowedError, msgId); if (callback) { callback(ResultCumulativeAcknowledgementNotAllowedError); } return; } auto pair = prepareCumulativeAck(msgId); const auto& msgIdToAck = pair.first; const auto& readyToAck = pair.second; if (readyToAck) { consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Cumulative, 1); unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck); ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck, callback); } else if (callback) { callback(ResultOk); } interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), ResultOk, msgId); } bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType consumerType) { return consumerType != ConsumerKeyShared && consumerType != ConsumerShared; } std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& messageId) { auto messageIdImpl = Commands::getMessageIdImpl(messageId); auto batchedMessageIdImpl = std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl); auto batchSize = messageId.batchSize(); if (!batchedMessageIdImpl || batchedMessageIdImpl->ackIndividual(messageId.batchIndex())) { consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Individual, (batchSize > 0) ? batchSize : 1); unAckedMessageTrackerPtr_->remove(messageId); possibleSendToDeadLetterTopicMessages_.remove(messageId); return std::make_pair(discardBatch(messageId), true); } else if (config_.isBatchIndexAckEnabled()) { return std::make_pair(messageId, true); } else { return std::make_pair(MessageId{}, false); } } std::pair<MessageId, bool> ConsumerImpl::prepareCumulativeAck(const MessageId& messageId) { auto messageIdImpl = Commands::getMessageIdImpl(messageId); auto batchedMessageIdImpl = std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl); if (!batchedMessageIdImpl || batchedMessageIdImpl->ackCumulative(messageId.batchIndex())) { return std::make_pair(discardBatch(messageId), true); } else if (config_.isBatchIndexAckEnabled()) { return std::make_pair(messageId, true); } else { if (batchedMessageIdImpl->shouldAckPreviousMessageId()) { return std::make_pair(batchedMessageIdImpl->getPreviousMessageId(), true); } else { return std::make_pair(MessageId{}, false); } } } void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) { unAckedMessageTrackerPtr_->remove(messageId); negativeAcksTracker_.add(messageId); } void ConsumerImpl::disconnectConsumer() { LOG_INFO("Broker notification of Closed consumer: " << consumerId_); resetCnx(); scheduleReconnection(get_shared_this_ptr()); } void ConsumerImpl::closeAsync(ResultCallback originalCallback) { auto callback = [this, originalCallback](Result result) { shutdown(); if (result == ResultOk) { LOG_INFO(getName() << "Closed consumer " << consumerId_); } else { LOG_WARN(getName() << "Failed to close consumer: " << result); } if (originalCallback) { originalCallback(result); } }; if (state_ != Ready) { callback(ResultAlreadyClosed); return; } LOG_INFO(getName() << "Closing consumer for topic " << topic_); state_ = Closing; incomingMessages_.close(); // Flush pending grouped ACK requests. if (ackGroupingTrackerPtr_) { ackGroupingTrackerPtr_->close(); } negativeAcksTracker_.close(); ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { // If connection is gone, also the consumer is closed on the broker side callback(ResultOk); return; } ClientImplPtr client = client_.lock(); if (!client) { // Client was already destroyed callback(ResultOk); return; } cancelTimers(); int requestId = client->newRequestId(); auto self = get_shared_this_ptr(); cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } const std::string& ConsumerImpl::getName() const { return consumerStr_; } void ConsumerImpl::shutdown() { if (ackGroupingTrackerPtr_) { ackGroupingTrackerPtr_->close(); } incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); resetCnx(); interceptors_->close(); auto client = client_.lock(); if (client) { client->cleanupConsumer(this); } negativeAcksTracker_.close(); cancelTimers(); consumerCreatedPromise_.setFailed(ResultAlreadyClosed); failPendingReceiveCallback(); failPendingBatchReceiveCallback(); state_ = Closed; } bool ConsumerImpl::isClosed() { return state_ == Closed; } bool ConsumerImpl::isOpen() { return state_ == Ready; } Result ConsumerImpl::pauseMessageListener() { if (!messageListener_) { return ResultInvalidConfiguration; } messageListenerRunning_ = false; return ResultOk; } Result ConsumerImpl::resumeMessageListener() { if (!messageListener_) { return ResultInvalidConfiguration; } if (messageListenerRunning_) { // Not paused return ResultOk; } messageListenerRunning_ = true; const size_t count = incomingMessages_.size(); for (size_t i = 0; i < count; i++) { // Trigger message listener callback in a separate thread listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); } // Check current permits and determine whether to send FLOW command this->increaseAvailablePermits(getCnx().lock(), 0); return ResultOk; } void ConsumerImpl::redeliverUnacknowledgedMessages() { static std::set<MessageId> emptySet; redeliverMessages(emptySet); unAckedMessageTrackerPtr_->clear(); } void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) { if (messageIds.empty()) { return; } if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) { redeliverUnacknowledgedMessages(); return; } ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { if (cnx->getServerProtocolVersion() >= proto::v2) { auto needRedeliverMsgs = std::make_shared<std::set<MessageId>>(); auto needCallBack = std::make_shared<std::atomic<int>>(messageIds.size()); auto self = get_shared_this_ptr(); // TODO Support MAX_REDELIVER_UNACKNOWLEDGED Avoid redelivering too many messages for (const auto& msgId : messageIds) { processPossibleToDLQ(msgId, [self, needRedeliverMsgs, &msgId, needCallBack](bool processSuccess) { if (!processSuccess) { needRedeliverMsgs->emplace(msgId); } if (--(*needCallBack) == 0 && !needRedeliverMsgs->empty()) { self->redeliverMessages(*needRedeliverMsgs); } }); } } } else { LOG_WARN("Connection not ready for Consumer - " << getConsumerId()); } } void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) { ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { if (cnx->getServerProtocolVersion() >= proto::v2) { cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_, messageIds)); LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " << getConsumerId()); } } else { LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId()); } } int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } Lock lock(mutex_); if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; lock.unlock(); callback(ResultOk, BrokerConsumerStats(std::make_shared<BrokerConsumerStatsImpl>(brokerConsumerStats_))); return; } lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { if (cnx->getServerProtocolVersion() >= proto::v8) { ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); LOG_DEBUG(getName() << " Sending ConsumerStats Command for Consumer - " << getConsumerId() << ", requestId - " << requestId); cnx->newConsumerStats(consumerId_, requestId) .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, callback)); return; } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v7"); callback(ResultUnsupportedVersionError, BrokerConsumerStats()); return; } } LOG_ERROR(getName() << " Client Connection not ready for Consumer"); callback(ResultNotConnected, BrokerConsumerStats()); } void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsImpl brokerConsumerStats, BrokerConsumerStatsCallback callback) { if (res == ResultOk) { Lock lock(mutex_); brokerConsumerStats.setCacheTime(config_.getBrokerConsumerStatsCacheTimeInMs()); brokerConsumerStats_ = brokerConsumerStats; } if (callback) { callback(res, BrokerConsumerStats(std::make_shared<BrokerConsumerStatsImpl>(brokerConsumerStats))); } } void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { const auto state = state_.load(); if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } ClientImplPtr client = client_.lock(); if (!client) { LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId); return; } const auto requestId = client->newRequestId(); seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback); } void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { const auto state = state_.load(); if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } ClientImplPtr client = client_.lock(); if (!client) { LOG_ERROR(getName() << "Client is expired when seekAsync " << timestamp); return; } const auto requestId = client->newRequestId(); seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(), timestamp, callback); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) { return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1; } void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) { const auto startMessageId = startMessageId_.get(); Lock lock(mutexForMessageId_); const auto messageId = (lastDequedMessageId_ == MessageId::earliest()) ? startMessageId.value() : lastDequedMessageId_; if (messageId == MessageId::latest()) { lock.unlock(); getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { if (result != ResultOk) { callback(result, {}); return; } if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { // We only care about comparing ledger ids and entry ids as mark delete position doesn't have // other ids such as batch index callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(), response.getLastMessageId()) < 0); } else { callback(ResultOk, false); } }); } else { if (hasMoreMessages(lastMessageIdInBroker_, messageId)) { lock.unlock(); callback(ResultOk, true); return; } lock.unlock(); getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) { callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId)); }); } } void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { const auto state = state_.load(); if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed, GetLastMessageIdResponse()); } return; } TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds()); BackoffPtr backoff = std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0)); DeadlineTimerPtr timer = executor_->createDeadlineTimer(); internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback); } void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime, const DeadlineTimerPtr& timer, BrokerGetLastMessageIdCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { if (cnx->getServerProtocolVersion() >= proto::v12) { ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId() << ", requestId - " << requestId); auto self = get_shared_this_ptr(); cnx->newGetLastMessageId(consumerId_, requestId) .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { if (result == ResultOk) { LOG_DEBUG(getName() << "getLastMessageId: " << response); Lock lock(mutexForMessageId_); lastMessageIdInBroker_ = response.getLastMessageId(); lock.unlock(); } else { LOG_ERROR(getName() << "Failed to getLastMessageId: " << result); } callback(result, response); }); } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v12"); callback(ResultUnsupportedVersionError, MessageId()); } } else { TimeDuration next = std::min(remainTime, backoff->next()); if (next.total_milliseconds() <= 0) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); callback(ResultNotConnected, MessageId()); return; } remainTime -= next; timer->expires_from_now(next); auto self = shared_from_this(); timer->async_wait([this, backoff, remainTime, timer, next, callback, self](const boost::system::error_code& ec) -> void { if (ec == boost::asio::error::operation_aborted) { LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "]."); return; } if (ec) { LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "]."); return; } LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in " << next.total_milliseconds() << " ms") this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback); }); } } void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { negativeAcksTracker_.setEnabledForTesting(enabled); } void ConsumerImpl::trackMessage(const MessageId& messageId) { if (hasParent_) { unAckedMessageTrackerPtr_->remove(messageId); } else { unAckedMessageTrackerPtr_->add(messageId); } } bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp, ResultCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); callback(ResultNotConnected); return; } const auto originalSeekMessageId = seekMessageId_.get(); seekMessageId_ = seekId; duringSeek_ = true; if (timestamp > 0) { LOG_INFO(getName() << " Seeking subscription to " << timestamp); } else { LOG_INFO(getName() << " Seeking subscription to " << seekId); } std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()}; cnx->sendRequestWithId(seek, requestId) .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, const ResponseData& responseData) { auto self = weakSelf.lock(); if (!self) { callback(result); return; } if (result == ResultOk) { LOG_INFO(getName() << "Seek successfully"); ackGroupingTrackerPtr_->flushAndClean(); incomingMessages_.clear(); Lock lock(mutexForMessageId_); lastDequedMessageId_ = MessageId::earliest(); lock.unlock(); } else { LOG_ERROR(getName() << "Failed to seek: " << result); seekMessageId_ = originalSeekMessageId; duringSeek_ = false; } callback(result); }); } bool ConsumerImpl::isPriorBatchIndex(int32_t idx) { return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().batchIndex() : idx <= startMessageId_.get().value().batchIndex(); } bool ConsumerImpl::isPriorEntryIndex(int64_t idx) { return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().entryId() : idx <= startMessageId_.get().value().entryId(); } bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const { if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy_.getMaxNumMessages() > 0 && incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) || (batchReceivePolicy_.getMaxNumBytes() > 0 && incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes()); } std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() { return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this()); } void ConsumerImpl::cancelTimers() noexcept { boost::system::error_code ec; batchReceiveTimer_->cancel(ec); checkExpiredChunkedTimer_->cancel(ec); } void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) { auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId); if (!messages) { cb(false); return; } // Initialize deadLetterProducer_ if (!deadLetterProducer_) { std::lock_guard<std::mutex> createLock(createProducerLock_); if (!deadLetterProducer_) { deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>(); ProducerConfiguration producerConfiguration; producerConfiguration.setSchema(config_.getSchema()); producerConfiguration.setBlockIfQueueFull(false); producerConfiguration.impl_->initialSubscriptionName = deadLetterPolicy_.getInitialSubscriptionName(); ClientImplPtr client = client_.lock(); if (client) { auto self = get_shared_this_ptr(); client->createProducerAsync( deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration, [self](Result res, Producer producer) { if (res == ResultOk) { self->deadLetterProducer_->setValue(producer); } else { LOG_ERROR("Dead letter producer create exception with topic: " << self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res); self->deadLetterProducer_.reset(); } }); } else { LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer."); return; } } } for (const auto& message : messages.value()) { std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()}; deadLetterProducer_->getFuture().addListener([weakSelf, message, messageId, cb](Result res, Producer producer) { auto self = weakSelf.lock(); if (!self) { return; } auto originMessageId = message.getMessageId(); std::stringstream originMessageIdStr; originMessageIdStr << originMessageId; MessageBuilder msgBuilder; msgBuilder.setAllocatedContent(const_cast<void*>(message.getData()), message.getLength()) .setProperties(message.getProperties()) .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str()) .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName()); if (message.hasPartitionKey()) { msgBuilder.setPartitionKey(message.getPartitionKey()); } if (message.hasOrderingKey()) { msgBuilder.setOrderingKey(message.getOrderingKey()); } producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, messageId, cb]( Result res, const MessageId& messageIdInDLQ) { auto self = weakSelf.lock(); if (!self) { return; } if (res == ResultOk) { if (self->state_ != Ready) { LOG_WARN( "Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : " << self->state_); cb(false); return; } self->possibleSendToDeadLetterTopicMessages_.remove(messageId); self->acknowledgeAsync(originMessageId, [weakSelf, originMessageId, cb](Result result) { auto self = weakSelf.lock(); if (!self) { return; } if (result != ResultOk) { LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" << self->consumerName_ << "} Failed to acknowledge the message {" << originMessageId << "} of the original topic but send to the DLQ successfully : " << result); cb(false); } else { LOG_DEBUG("Send msg:" << originMessageId << "to DLQ success and acknowledge success."); cb(true); } }); } else { LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" << self->consumerName_ << "} Failed to send DLQ message to {" << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " << "{" << originMessageId << "} : " << res); cb(false); } }); }); } } } /* namespace pulsar */