in cpp/source/rocketmq/ProducerImpl.cpp [367:445]
bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, TransactionState resolution) {
EndTransactionRequest request;
const std::string& topic = transaction.topic;
request.mutable_topic()->set_name(topic);
request.mutable_topic()->set_resource_namespace(resourceNamespace());
request.set_message_id(transaction.message_id);
request.set_transaction_id(transaction.transaction_id);
std::string action;
switch (resolution) {
case TransactionState::COMMIT:
request.set_resolution(rmq::COMMIT);
action = "commit";
break;
case TransactionState::ROLLBACK:
request.set_resolution(rmq::ROLLBACK);
action = "rollback";
break;
}
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(config(), metadata);
bool completed = false;
bool success = false;
auto span = opencensus::trace::Span::BlankSpan();
if (!transaction.trace_context.empty() && client_config_.sampler_) {
// Trace transactional message
opencensus::trace::SpanContext span_context =
opencensus::trace::propagation::FromTraceParentHeader(transaction.trace_context);
std::string trace_operation_name = TransactionState::COMMIT == resolution
? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
: MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
std::string span_name = resourceNamespace() + "/" + transaction.topic + " " + trace_operation_name;
if (span_context.IsValid()) {
span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()});
} else {
span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
}
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
// TracingUtility::addUniversalSpanAttributes(message, config(), span);
}
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
const auto& endpoint = transaction.target;
std::weak_ptr<ProducerImpl> publisher(shared_from_this());
auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) {
if (ec) {
{
span.SetStatus(opencensus::trace::StatusCode::ABORTED);
span.AddAnnotation(ec.message());
span.End();
}
SPDLOG_WARN("Failed to send {} transaction request to {}. Cause: ", action, endpoint, ec.message());
success = false;
} else {
{
span.SetStatus(opencensus::trace::StatusCode::OK);
span.End();
}
success = true;
}
{
absl::MutexLock lk(mtx.get());
completed = true;
cv->SignalAll();
}
};
client_manager_->endTransaction(transaction.target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()),
cb);
{
absl::MutexLock lk(mtx.get());
cv->Wait(mtx.get());
}
return success;
}