bool ProducerImpl::endTransaction0()

in cpp/source/rocketmq/ProducerImpl.cpp [387:466]


bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, TransactionState resolution) {
  EndTransactionRequest request;
  const std::string& topic = transaction.topic;
  request.mutable_topic()->set_name(topic);
  request.mutable_topic()->set_resource_namespace(resourceNamespace());
  request.set_message_id(transaction.message_id);
  request.set_transaction_id(transaction.transaction_id);

  std::string action;
  switch (resolution) {
    case TransactionState::COMMIT:
      request.set_resolution(rmq::COMMIT);
      action = "commit";
      break;
    case TransactionState::ROLLBACK:
      request.set_resolution(rmq::ROLLBACK);
      action = "rollback";
      break;
  }
  absl::flat_hash_map<std::string, std::string> metadata;
  Signature::sign(config(), metadata);
  bool completed = false;
  bool success = false;
  auto span = opencensus::trace::Span::BlankSpan();
  if (!transaction.trace_context.empty() && client_config_.sampler_) {
    // Trace transactional message
    opencensus::trace::SpanContext span_context =
        opencensus::trace::propagation::FromTraceParentHeader(transaction.trace_context);
    std::string trace_operation_name = TransactionState::COMMIT == resolution
                                           ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
                                           : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
    std::string span_name = resourceNamespace() + "/" + transaction.topic + " " + trace_operation_name;
    if (span_context.IsValid()) {
      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()});
    } else {
      span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
    }
    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
    // TracingUtility::addUniversalSpanAttributes(message, config(), span);
  }

  auto mtx = std::make_shared<absl::Mutex>();
  auto cv = std::make_shared<absl::CondVar>();
  const auto& endpoint = transaction.target;
  std::weak_ptr<ProducerImpl> publisher(shared_from_this());

  auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) {
    if (ec) {
      {
        span.SetStatus(opencensus::trace::StatusCode::ABORTED);
        span.AddAnnotation(ec.message());
        span.End();
      }
      SPDLOG_WARN("Failed to send {} transaction request to {}. Cause: ", action, endpoint, ec.message());
      success = false;
    } else {
      {
        span.SetStatus(opencensus::trace::StatusCode::OK);
        span.End();
      }
      success = true;
    }

    {
      absl::MutexLock lk(mtx.get());
      completed = true;
      cv->SignalAll();
    }
  };

  client_manager_->endTransaction(
      transaction.target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()), cb);

  {
    absl::MutexLock lk(mtx.get());
    cv->Wait(mtx.get());
  }
  return success;
}