lib/ConsumerConfiguration.cc (230 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include <pulsar/ConsumerConfiguration.h> #include <stdexcept> #include "ConsumerConfigurationImpl.h" namespace pulsar { const static std::string emptyString; ConsumerConfiguration::ConsumerConfiguration() : impl_(std::make_shared<ConsumerConfigurationImpl>()) {} ConsumerConfiguration::~ConsumerConfiguration() {} ConsumerConfiguration::ConsumerConfiguration(const ConsumerConfiguration& x) : impl_(x.impl_) {} ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfiguration& x) { impl_ = x.impl_; return *this; } ConsumerConfiguration ConsumerConfiguration::clone() const { ConsumerConfiguration newConf; newConf.impl_.reset(new ConsumerConfigurationImpl(*this->impl_)); return newConf; } ConsumerConfiguration& ConsumerConfiguration::setSchema(const SchemaInfo& schemaInfo) { impl_->schemaInfo = schemaInfo; return *this; } const SchemaInfo& ConsumerConfiguration::getSchema() const { return impl_->schemaInfo; } long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const { return impl_->brokerConsumerStatsCacheTimeInMs; } void ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs) { impl_->brokerConsumerStatsCacheTimeInMs = cacheTimeInMs; } ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) { impl_->consumerType = consumerType; return *this; } ConsumerType ConsumerConfiguration::getConsumerType() const { return impl_->consumerType; } ConsumerConfiguration& ConsumerConfiguration::setMessageListener(MessageListener messageListener) { impl_->messageListener = messageListener; impl_->hasMessageListener = true; return *this; } MessageListener ConsumerConfiguration::getMessageListener() const { return impl_->messageListener; } bool ConsumerConfiguration::hasMessageListener() const { return impl_->hasMessageListener; } ConsumerConfiguration& ConsumerConfiguration::setConsumerEventListener( ConsumerEventListenerPtr eventListener) { impl_->eventListener = eventListener; impl_->hasConsumerEventListener = true; return *this; } ConsumerEventListenerPtr ConsumerConfiguration::getConsumerEventListener() const { return impl_->eventListener; } bool ConsumerConfiguration::hasConsumerEventListener() const { return impl_->hasConsumerEventListener; } void ConsumerConfiguration::setReceiverQueueSize(int size) { impl_->receiverQueueSize = size; } int ConsumerConfiguration::getReceiverQueueSize() const { return impl_->receiverQueueSize; } void ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions(int size) { impl_->maxTotalReceiverQueueSizeAcrossPartitions = size; } int ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions() const { return impl_->maxTotalReceiverQueueSizeAcrossPartitions; } const std::string& ConsumerConfiguration::getConsumerName() const { return impl_->consumerName; } void ConsumerConfiguration::setConsumerName(const std::string& consumerName) { impl_->consumerName = consumerName; } long ConsumerConfiguration::getUnAckedMessagesTimeoutMs() const { return impl_->unAckedMessagesTimeoutMs; } void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) { if (milliSeconds < 10000 && milliSeconds != 0) { throw std::invalid_argument( "Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds."); } impl_->unAckedMessagesTimeoutMs = milliSeconds; } long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; } void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) { impl_->tickDurationInMs = milliSeconds; } void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) { impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis; } long ConsumerConfiguration::getNegativeAckRedeliveryDelayMs() const { return impl_->negativeAckRedeliveryDelayMs; } void ConsumerConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) { impl_->ackGroupingTimeMs = ackGroupingMillis; } long ConsumerConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; } void ConsumerConfiguration::setAckGroupingMaxSize(long maxGroupingSize) { impl_->ackGroupingMaxSize = maxGroupingSize; } long ConsumerConfiguration::getAckGroupingMaxSize() const { return impl_->ackGroupingMaxSize; } bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); } const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; } ConsumerConfiguration& ConsumerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) { impl_->cryptoKeyReader = cryptoKeyReader; return *this; } ConsumerCryptoFailureAction ConsumerConfiguration::getCryptoFailureAction() const { return impl_->cryptoFailureAction; } ConsumerConfiguration& ConsumerConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) { impl_->cryptoFailureAction = action; return *this; } bool ConsumerConfiguration::isReadCompacted() const { return impl_->readCompacted; } void ConsumerConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; } void ConsumerConfiguration::setSubscriptionInitialPosition(InitialPosition subscriptionInitialPosition) { impl_->subscriptionInitialPosition = subscriptionInitialPosition; } InitialPosition ConsumerConfiguration::getSubscriptionInitialPosition() const { return impl_->subscriptionInitialPosition; } void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) { impl_->patternAutoDiscoveryPeriod = periodInSeconds; } int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; } void ConsumerConfiguration::setReplicateSubscriptionStateEnabled(bool enabled) { impl_->replicateSubscriptionStateEnabled = enabled; } bool ConsumerConfiguration::isReplicateSubscriptionStateEnabled() const { return impl_->replicateSubscriptionStateEnabled; } bool ConsumerConfiguration::hasProperty(const std::string& name) const { const std::map<std::string, std::string>& m = impl_->properties; return m.find(name) != m.end(); } const std::string& ConsumerConfiguration::getProperty(const std::string& name) const { if (hasProperty(name)) { const std::map<std::string, std::string>& m = impl_->properties; return m.at(name); } else { return emptyString; } } std::map<std::string, std::string>& ConsumerConfiguration::getProperties() const { return impl_->properties; } ConsumerConfiguration& ConsumerConfiguration::setProperty(const std::string& name, const std::string& value) { impl_->properties.insert(std::make_pair(name, value)); return *this; } ConsumerConfiguration& ConsumerConfiguration::setProperties( const std::map<std::string, std::string>& properties) { for (std::map<std::string, std::string>::const_iterator it = properties.begin(); it != properties.end(); it++) { setProperty(it->first, it->second); } return *this; } std::map<std::string, std::string>& ConsumerConfiguration::getSubscriptionProperties() const { return impl_->subscriptionProperties; } ConsumerConfiguration& ConsumerConfiguration::setSubscriptionProperties( const std::map<std::string, std::string>& subscriptionProperties) { for (const auto& subscriptionProperty : subscriptionProperties) { impl_->subscriptionProperties.emplace(subscriptionProperty.first, subscriptionProperty.second); } return *this; } ConsumerConfiguration& ConsumerConfiguration::setPriorityLevel(int priorityLevel) { if (priorityLevel < 0) { throw std::invalid_argument("Consumer Config Exception: PriorityLevel should be nonnegative number."); } impl_->priorityLevel = priorityLevel; return *this; } int ConsumerConfiguration::getPriorityLevel() const { return impl_->priorityLevel; } ConsumerConfiguration& ConsumerConfiguration::setKeySharedPolicy(KeySharedPolicy keySharedPolicy) { impl_->keySharedPolicy = keySharedPolicy.clone(); return *this; } KeySharedPolicy ConsumerConfiguration::getKeySharedPolicy() const { return impl_->keySharedPolicy; } ConsumerConfiguration& ConsumerConfiguration::setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage) { impl_->maxPendingChunkedMessage = maxPendingChunkedMessage; return *this; } size_t ConsumerConfiguration::getMaxPendingChunkedMessage() const { return impl_->maxPendingChunkedMessage; } ConsumerConfiguration& ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull( bool autoAckOldestChunkedMessageOnQueueFull) { impl_->autoAckOldestChunkedMessageOnQueueFull = autoAckOldestChunkedMessageOnQueueFull; return *this; } bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const { return impl_->autoAckOldestChunkedMessageOnQueueFull; } ConsumerConfiguration& ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs( long expireTimeOfIncompleteChunkedMessageMs) { impl_->expireTimeOfIncompleteChunkedMessageMs = expireTimeOfIncompleteChunkedMessageMs; return *this; } long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const { return impl_->expireTimeOfIncompleteChunkedMessageMs; } ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) { impl_->startMessageIdInclusive = startMessageIdInclusive; return *this; } bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; } void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) { impl_->batchReceivePolicy = batchReceivePolicy; } const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const { return impl_->batchReceivePolicy; } ConsumerConfiguration& ConsumerConfiguration::setBatchIndexAckEnabled(bool enabled) { impl_->batchIndexAckEnabled = enabled; return *this; } bool ConsumerConfiguration::isBatchIndexAckEnabled() const { return impl_->batchIndexAckEnabled; } void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy) { impl_->deadLetterPolicy = deadLetterPolicy; } const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; } ConsumerConfiguration& ConsumerConfiguration::intercept( const std::vector<ConsumerInterceptorPtr>& interceptors) { impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end()); return *this; } const std::vector<ConsumerInterceptorPtr>& ConsumerConfiguration::getInterceptors() const { return impl_->interceptors; } ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool ackReceiptEnabled) { impl_->ackReceiptEnabled = ackReceiptEnabled; return *this; } bool ConsumerConfiguration::isAckReceiptEnabled() const { return impl_->ackReceiptEnabled; } ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode( RegexSubscriptionMode regexSubscriptionMode) { impl_->regexSubscriptionMode = regexSubscriptionMode; return *this; } RegexSubscriptionMode ConsumerConfiguration::getRegexSubscriptionMode() const { return impl_->regexSubscriptionMode; } } // namespace pulsar