lib/Client.cc (143 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include <pulsar/Client.h> #include <iostream> #include <memory> #include <utility> #include "ClientImpl.h" #include "Int64SerDes.h" #include "LogUtils.h" #include "LookupService.h" #include "TopicName.h" #include "Utils.h" DECLARE_LOG_OBJECT() namespace pulsar { Client::Client(const std::shared_ptr<ClientImpl> impl) : impl_(impl) {} Client::Client(const std::string& serviceUrl) : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration(), true)) {} Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration) : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, true)) {} Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections) : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, poolConnections)) {} Result Client::createProducer(const std::string& topic, Producer& producer) { return createProducer(topic, ProducerConfiguration(), producer); } Result Client::createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer) { Promise<Result, Producer> promise; createProducerAsync(topic, conf, WaitForCallbackValue<Producer>(promise)); Future<Result, Producer> future = promise.getFuture(); return future.get(producer); } void Client::createProducerAsync(const std::string& topic, CreateProducerCallback callback) { createProducerAsync(topic, ProducerConfiguration(), callback); } void Client::createProducerAsync(const std::string& topic, ProducerConfiguration conf, CreateProducerCallback callback) { impl_->createProducerAsync(topic, conf, callback); } Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer) { return subscribe(topic, subscriptionName, ConsumerConfiguration(), consumer); } Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer) { Promise<Result, Consumer> promise; subscribeAsync(topic, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise)); Future<Result, Consumer> future = promise.getFuture(); return future.get(consumer); } void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, SubscribeCallback callback) { subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback); } void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback) { LOG_INFO("Subscribing on Topic :" << topic); impl_->subscribeAsync(topic, subscriptionName, conf, callback); } Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName, Consumer& consumer) { return subscribe(topics, subscriptionName, ConsumerConfiguration(), consumer); } Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer) { Promise<Result, Consumer> promise; subscribeAsync(topics, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise)); Future<Result, Consumer> future = promise.getFuture(); return future.get(consumer); } void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName, SubscribeCallback callback) { subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback); } void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback) { impl_->subscribeAsync(topics, subscriptionName, conf, callback); } Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, Consumer& consumer) { return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer); } Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer) { Promise<Result, Consumer> promise; subscribeWithRegexAsync(regexPattern, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise)); Future<Result, Consumer> future = promise.getFuture(); return future.get(consumer); } void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, SubscribeCallback callback) { subscribeWithRegexAsync(regexPattern, subscriptionName, ConsumerConfiguration(), callback); } void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback) { impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback); } Result Client::createReader(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, Reader& reader) { Promise<Result, Reader> promise; createReaderAsync(topic, startMessageId, conf, WaitForCallbackValue<Reader>(promise)); Future<Result, Reader> future = promise.getFuture(); return future.get(reader); } void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, ReaderCallback callback) { impl_->createReaderAsync(topic, startMessageId, conf, callback); } Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf, TableView& tableView) { Promise<Result, TableView> promise; createTableViewAsync(topic, conf, WaitForCallbackValue<TableView>(promise)); Future<Result, TableView> future = promise.getFuture(); return future.get(tableView); } void Client::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, TableViewCallback callBack) { impl_->createTableViewAsync(topic, conf, callBack); } Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::string>& partitions) { Promise<Result, std::vector<std::string> > promise; getPartitionsForTopicAsync(topic, WaitForCallbackValue<std::vector<std::string> >(promise)); Future<Result, std::vector<std::string> > future = promise.getFuture(); return future.get(partitions); } void Client::getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback) { impl_->getPartitionsForTopicAsync(topic, callback); } Result Client::close() { Promise<bool, Result> promise; closeAsync(WaitForCallback(promise)); Result result; promise.getFuture().get(result); return result; } void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); } void Client::shutdown() { impl_->shutdown(); } uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); } uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); } void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, std::function<void(Result, const SchemaInfo&)> callback) { impl_->getLookup() ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") .addListener(callback); } } // namespace pulsar