cpp/source/rocketmq/ProducerImpl.cpp (567 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ProducerImpl.h" #include <algorithm> #include <cassert> #include <chrono> #include <memory> #include <system_error> #include <utility> #include "MixAll.h" #include "Protocol.h" #include "PublishInfoCallback.h" #include "SendContext.h" #include "Signature.h" #include "TracingUtility.h" #include "TransactionImpl.h" #include "UtilAll.h" #include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/Message.h" #include "rocketmq/SendReceipt.h" #include "rocketmq/Transaction.h" #include "rocketmq/TransactionChecker.h" #include "rocketmq/RecallReceipt.h" ROCKETMQ_NAMESPACE_BEGIN ProducerImpl::ProducerImpl() : ClientImpl(""), compress_body_threshold_(MixAll::DEFAULT_COMPRESS_BODY_THRESHOLD_) { // TODO: initialize client_config_ and fault_strategy_ } ProducerImpl::~ProducerImpl() { SPDLOG_INFO("Producer instance is destructed"); shutdown(); } void ProducerImpl::start() { ClientImpl::start(); State expecting = State::STARTING; if (!state_.compare_exchange_strong(expecting, State::STARTED)) { SPDLOG_ERROR("Start with unexpected state. Expecting: {}, Actual: {}", State::STARTING, state_.load(std::memory_order_relaxed)); return; } client_manager_->addClientObserver(shared_from_this()); } void ProducerImpl::shutdown() { State expected = State::STARTED; if (!state_.compare_exchange_strong(expected, State::STOPPING)) { SPDLOG_ERROR("Shutdown with unexpected state. Expecting: {}, Actual: {}", State::STOPPING, state_.load(std::memory_order_relaxed)); return; } notifyClientTermination(); ClientImpl::shutdown(); assert(State::STOPPED == state_.load()); SPDLOG_INFO("Producer instance stopped"); } void ProducerImpl::notifyClientTermination() { } bool ProducerImpl::isRunning() const { return State::STARTED == state_.load(std::memory_order_relaxed); } void ProducerImpl::ensureRunning(std::error_code& ec) const noexcept { if (!isRunning()) { ec = ErrorCode::IllegalState; } } void ProducerImpl::validate(const Message& message, std::error_code& ec) { MixAll::validate(message, ec); if (!ec) { if (message.body().empty()) { SPDLOG_WARN("Body of the message is null, topic={}", message.topic()); ec = ErrorCode::MessageBodyEmpty; } if (message.body().length() > client_config_.publisher.max_body_size) { SPDLOG_WARN("Body of the message to send is too large, topic={}", message.topic()); ec = ErrorCode::PayloadTooLarge; } } } void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request, const rmq::MessageQueue& message_queue) { auto msg = new rmq::Message(); msg->mutable_topic()->set_resource_namespace(resourceNamespace()); msg->mutable_topic()->set_name(message.topic()); auto system_properties = msg->mutable_system_properties(); // Handle Tag if (!message.tag().empty()) { system_properties->set_tag(message.tag()); } // Handle Key const auto& keys = message.keys(); if (!keys.empty()) { system_properties->mutable_keys()->Add(keys.begin(), keys.end()); } // TraceContext if (!message.traceContext().empty()) { const auto& trace_context = message.traceContext(); if (!trace_context.empty()) { system_properties->set_trace_context(trace_context); } } // Delivery Timestamp if (message.deliveryTimestamp().time_since_epoch().count()) { auto delivery_timestamp = message.deliveryTimestamp(); if (delivery_timestamp.time_since_epoch().count()) { auto duration = delivery_timestamp.time_since_epoch(); system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()); auto mutable_delivery_timestamp = system_properties->mutable_delivery_timestamp(); mutable_delivery_timestamp->set_seconds(std::chrono::duration_cast<std::chrono::seconds>(duration).count()); mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() % 1000000000); } } // Born-time auto duration = absl::Now() - absl::UnixEpoch(); int64_t seconds = absl::ToInt64Seconds(duration); system_properties->mutable_born_timestamp()->set_seconds(seconds); system_properties->mutable_born_timestamp()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds))); system_properties->set_born_host(UtilAll::hostname()); if (message.deliveryTimestamp().time_since_epoch().count()) { system_properties->set_message_type(rmq::MessageType::DELAY); } else if (!message.group().empty()) { system_properties->set_message_type(rmq::MessageType::FIFO); } else if (message.extension().transactional) { system_properties->set_message_type(rmq::MessageType::TRANSACTION); } else { system_properties->set_message_type(rmq::MessageType::NORMAL); } if (message.body().size() >= compress_body_threshold_) { std::string compressed_body; UtilAll::compress(message.body(), compressed_body); msg->set_body(compressed_body); system_properties->set_body_encoding(rmq::Encoding::GZIP); } else { msg->set_body(message.body()); system_properties->set_body_encoding(rmq::Encoding::IDENTITY); } if (!message.group().empty()) { system_properties->set_message_group(message.group()); } system_properties->set_message_id(message.id()); system_properties->set_queue_id(message_queue.id()); // Forward user-defined-properties for (auto& item : message.properties()) { msg->mutable_user_properties()->insert({item.first, item.second}); } // Add into request request.mutable_messages()->AddAllocated(msg); SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString()); } SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept { ensureRunning(ec); if (ec) { SPDLOG_WARN("Producer is not running"); SendReceipt send_receipt{}; send_receipt.message = std::move(message); return send_receipt; } auto topic_publish_info = getPublishInfo(message->topic()); if (!topic_publish_info) { SPDLOG_WARN("Route of topic[{}] is not found", message->topic()); ec = ErrorCode::NotFound; SendReceipt send_receipt{}; send_receipt.message = std::move(message); return send_receipt; } std::vector<rmq::MessageQueue> message_queue_list; // null_opt_t absl::optional<std::string> message_group{}; if (!topic_publish_info->selectMessageQueues(message_group, message_queue_list)) { SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", message->topic()); ec = ErrorCode::NotFound; SendReceipt send_receipt{}; send_receipt.message = std::move(message); return send_receipt; } auto mtx = std::make_shared<absl::Mutex>(); auto cv = std::make_shared<absl::CondVar>(); bool completed = false; SendReceipt send_receipt; // Define callback auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable { ec = code; auto& receipt_mut = const_cast<SendReceipt&>(receipt); send_receipt.target = std::move(receipt_mut.target); send_receipt.message_id = std::move(receipt_mut.message_id); send_receipt.message = std::move(receipt_mut.message); send_receipt.transaction_id = std::move(receipt_mut.transaction_id); send_receipt.recall_handle = std::move(receipt_mut.recall_handle); { absl::MutexLock lk(mtx.get()); completed = true; } cv->Signal(); }; send0(std::move(message), callback, message_queue_list); { absl::MutexLock lk(mtx.get()); if (!completed) { cv->Wait(mtx.get()); } } return send_receipt; } void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { std::error_code ec; ensureRunning(ec); if (ec) { SendReceipt send_receipt; send_receipt.message = std::move(message); cb(ec, send_receipt); } std::string topic = message->topic(); std::weak_ptr<ProducerImpl> client(shared_from_this()); // Walk-around: Use capture by move when C++14 is possible const Message* msg = message.release(); auto callback = [client, cb, msg](const std::error_code& ec, const TopicPublishInfoPtr& publish_info) { MessageConstPtr ptr(msg); // No route entries of the given topic is available if (ec) { SendReceipt send_receipt; send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } if (!publish_info) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } auto publisher = client.lock(); std::vector<rmq::MessageQueue> message_queue_list; if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } publisher->send0(std::move(ptr), cb, message_queue_list); }; getPublishInfoAsync(topic, callback); } void ProducerImpl::setTransactionChecker(TransactionChecker checker) { transaction_checker_ = std::move(checker); } void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { const std::string& target = urlOf(context->messageQueue()); if (target.empty()) { SPDLOG_WARN("Failed to resolve broker address from MessageQueue"); std::error_code ec = ErrorCode::BadGateway; context->onFailure(ec); return; } { // Trace Send RPC if (!context->message_->traceContext().empty() && client_config_.sampler_) { auto span_context = opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext()); auto span = opencensus::trace::Span::BlankSpan(); std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " + MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION; if (span_context.IsValid()) { span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()}); } else { span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()}); } span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION); span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION); TracingUtility::addUniversalSpanAttributes(*context->message_, config(), span); // Note: attempt-time is 0-based span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + context->attempt_times_); if (context->message_->deliveryTimestamp().time_since_epoch().count()) { span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP, absl::FormatTime(absl::FromChrono(context->message_->deliveryTimestamp()))); } auto ptr = const_cast<Message*>(context->message_.get()); ptr->traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context())); context->span_ = span; } } SendMessageRequest request; wrapSendMessageRequest(*context->message_, request, context->messageQueue()); Metadata metadata; Signature::sign(client_config_, metadata); auto callback = [context](const SendResult& send_result) { if (send_result.ec) { context->onFailure(send_result.ec); return; } context->onSuccess(send_result); }; client_manager_->send(target, metadata, request, callback); } void ProducerImpl::send0(MessageConstPtr message, const SendCallback& callback, std::vector<rmq::MessageQueue> list) { SendReceipt send_receipt; std::error_code ec; validate(*message, ec); if (ec) { send_receipt.message = std::move(message); callback(ec, send_receipt); return; } if (list.empty()) { ec = ErrorCode::NotFound; send_receipt.message = std::move(message); callback(ec, send_receipt); return; } auto context = std::make_shared<SendContext>( shared_from_this(), std::move(message), callback, std::move(list)); sendImpl(context); } bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, TransactionState resolution) { EndTransactionRequest request; const std::string& topic = transaction.topic; request.mutable_topic()->set_name(topic); request.mutable_topic()->set_resource_namespace(resourceNamespace()); request.set_message_id(transaction.message_id); request.set_transaction_id(transaction.transaction_id); std::string action; switch (resolution) { case TransactionState::COMMIT: request.set_resolution(rmq::COMMIT); action = "commit"; break; case TransactionState::ROLLBACK: request.set_resolution(rmq::ROLLBACK); action = "rollback"; break; } absl::flat_hash_map<std::string, std::string> metadata; Signature::sign(config(), metadata); bool completed = false; bool success = false; auto span = opencensus::trace::Span::BlankSpan(); if (!transaction.trace_context.empty() && client_config_.sampler_) { // Trace transactional message opencensus::trace::SpanContext span_context = opencensus::trace::propagation::FromTraceParentHeader(transaction.trace_context); std::string trace_operation_name = TransactionState::COMMIT == resolution ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION; std::string span_name = resourceNamespace() + "/" + transaction.topic + " " + trace_operation_name; if (span_context.IsValid()) { span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()}); } else { span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()}); } span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name); span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name); // TracingUtility::addUniversalSpanAttributes(message, config(), span); } auto mtx = std::make_shared<absl::Mutex>(); auto cv = std::make_shared<absl::CondVar>(); const auto& endpoint = transaction.target; std::weak_ptr<ProducerImpl> publisher(shared_from_this()); auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) { if (ec) { { span.SetStatus(opencensus::trace::StatusCode::ABORTED); span.AddAnnotation(ec.message()); span.End(); } SPDLOG_WARN("Failed to send {} transaction request to {}. Cause: ", action, endpoint, ec.message()); success = false; } else { { span.SetStatus(opencensus::trace::StatusCode::OK); span.End(); } success = true; } { absl::MutexLock lk(mtx.get()); completed = true; cv->SignalAll(); } }; client_manager_->endTransaction( transaction.target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()), cb); { absl::MutexLock lk(mtx.get()); cv->Wait(mtx.get()); } return success; } void ProducerImpl::isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) { absl::MutexLock lk(&isolated_endpoints_mtx_); endpoints.insert(isolated_endpoints_.begin(), isolated_endpoints_.end()); } bool ProducerImpl::isEndpointIsolated(const std::string& target) { absl::MutexLock lk(&isolated_endpoints_mtx_); return isolated_endpoints_.contains(target); } void ProducerImpl::isolateEndpoint(const std::string& target) { absl::MutexLock lk(&isolated_endpoints_mtx_); isolated_endpoints_.insert(target); } SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { MiniTransaction mini = {}; mini.topic = message->topic(); mini.trace_context = message->traceContext(); if (!message->group().empty()) { ec = ErrorCode::MessagePropertyConflictWithType; SPDLOG_WARN("FIFO message may not be transactional"); SendReceipt send_receipt{}; send_receipt.message = std::move(message); return send_receipt; } if (message->deliveryTimestamp().time_since_epoch().count()) { ec = ErrorCode::MessagePropertyConflictWithType; SPDLOG_WARN("Timed message may not be transactional"); SendReceipt send_receipt{}; send_receipt.message = std::move(message); return send_receipt; } Message* msg = const_cast<Message*>(message.get()); msg->mutableExtension().transactional = true; SendReceipt send_receipt = send(std::move(message), ec); mini.message_id = send_receipt.message_id; mini.transaction_id = send_receipt.transaction_id; mini.target = send_receipt.target; auto& impl = dynamic_cast<TransactionImpl&>(transaction); impl.appendMiniTransaction(mini); return send_receipt; } RecallReceipt ProducerImpl::recall(const std::string& topic, std::string& recall_handle, std::error_code& ec) noexcept { ensureRunning(ec); if (ec) { SPDLOG_WARN("Producer is not running"); return RecallReceipt{}; } auto topic_publish_info = getPublishInfo(topic); if (!topic_publish_info) { SPDLOG_WARN("Route of topic[{}] is not found", topic); ec = ErrorCode::NotFound; return RecallReceipt{}; } std::vector<rmq::MessageQueue> message_queue_list; absl::optional<std::string> message_group{}; if (!topic_publish_info->selectMessageQueues(message_group, message_queue_list)) { SPDLOG_WARN("Failed to select an addressable message queue for timer topic[{}]", topic); ec = ErrorCode::NotFound; return RecallReceipt{}; } const std::string& target = urlOf(message_queue_list.front()); if (target.empty()) { SPDLOG_WARN("Failed to resolve broker address from MessageQueue"); ec = ErrorCode::BadGateway; return RecallReceipt{}; } RecallMessageRequest request; request.mutable_topic()->set_resource_namespace(resourceNamespace()); request.mutable_topic()->set_name(topic); request.set_recall_handle(recall_handle); Metadata metadata; Signature::sign(client_config_, metadata); auto mtx = std::make_shared<absl::Mutex>(); auto cv = std::make_shared<absl::CondVar>(); RecallReceipt recall_receipt; auto callback = [&, mtx, cv, topic](const std::error_code& code, const RecallMessageResponse& response) { ec = code; if (!ec) { recall_receipt.message_id = response.message_id(); } { absl::MutexLock lk(mtx.get()); cv->SignalAll(); } }; client_manager_->recallMessage( target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()), callback); { absl::MutexLock lk(mtx.get()); cv->Wait(mtx.get()); } return recall_receipt; } void ProducerImpl::getPublishInfoAsync(const std::string& topic, const PublishInfoCallback& cb) { TopicPublishInfoPtr ptr; { absl::MutexLock lock(&topic_publish_info_mtx_); if (topic_publish_info_table_.contains(topic)) { ptr = topic_publish_info_table_.at(topic); } } std::error_code ec; if (ptr) { // Serve with the cached one cb(ec, ptr); return; } std::weak_ptr<ProducerImpl> producer(shared_from_this()); auto callback = [topic, cb, producer](const std::error_code& ec, const TopicRouteDataPtr& route) { if (ec) { cb(ec, nullptr); return; } auto publish_info = std::make_shared<TopicPublishInfo>(producer, topic, route); auto impl = producer.lock(); if (impl) { impl->cachePublishInfo(topic, publish_info); } cb(ec, publish_info); }; getRouteFor(topic, callback); } void ProducerImpl::cachePublishInfo(const std::string& topic, TopicPublishInfoPtr info) { absl::MutexLock lk(&topic_publish_info_mtx_); topic_publish_info_table_.insert_or_assign(topic, info); } TopicPublishInfoPtr ProducerImpl::getPublishInfo(const std::string& topic) { bool complete = false; auto mtx = std::make_shared<absl::Mutex>(); auto cv = std::make_shared<absl::CondVar>(); TopicPublishInfoPtr topic_publish_info; std::error_code error_code; auto cb = [&, mtx, cv](const std::error_code& ec, const TopicPublishInfoPtr& ptr) { absl::MutexLock lk(mtx.get()); topic_publish_info = ptr; error_code = ec; complete = true; cv->SignalAll(); }; getPublishInfoAsync(topic, cb); // Wait till acquiring topic publish info completes while (!complete) { absl::MutexLock lk(mtx.get()); cv->Wait(mtx.get()); } // TODO: propagate error_code to caller return topic_publish_info; } void ProducerImpl::prepareHeartbeatData(HeartbeatRequest& request) { request.set_client_type(rmq::ClientType::PRODUCER); } void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message) { if (transaction_checker_) { std::weak_ptr<ProducerImpl> producer(shared_from_this()); MiniTransaction transaction = {}; transaction.topic = message->topic(); transaction.message_id = message->id(); transaction.transaction_id = message->extension().transaction_id; transaction.trace_context = message->traceContext(); transaction.target = message->extension().target_endpoint; TransactionState state = transaction_checker_(*message); endTransaction0(transaction, state); } else { SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr"); } } void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) { absl::MutexLock lk(&topics_mtx_); for (auto& topic : topics_) { if (std::find(topics.begin(), topics.end(), topic) == topics.end()) { topics.push_back(topic); } } } void ProducerImpl::withTopics(const std::vector<std::string> &topics) { absl::MutexLock lk(&topics_mtx_); for (auto &topic: topics) { if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) { topics_.push_back(topic); } } } void ProducerImpl::buildClientSettings(rmq::Settings& settings) { settings.set_client_type(rmq::ClientType::PRODUCER); settings.mutable_publishing()->set_max_body_size(MixAll::MAX_MESSAGE_BODY_SIZE); settings.mutable_publishing()->set_validate_message_type(false); auto topics = settings.mutable_publishing()->mutable_topics(); { absl::MutexLock lk(&topics_mtx_); for (const auto& item : topics_) { auto topic = new rmq::Resource; topic->set_resource_namespace(resourceNamespace()); topic->set_name(item); topics->AddAllocated(topic); } } } ROCKETMQ_NAMESPACE_END