void ClientManagerImpl::endTransaction()

in cpp/source/client/ClientManagerImpl.cpp [1161:1273]


void ClientManagerImpl::endTransaction(
    const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request,
    std::chrono::milliseconds timeout,
    const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) {
  RpcClientSharedPtr client = getRpcClient(target_host);
  if (!client) {
    SPDLOG_WARN("No RPC client for {}", target_host);
    EndTransactionResponse response;
    std::error_code ec = ErrorCode::BadRequest;
    cb(ec, response);
    return;
  }

  SPDLOG_DEBUG("Prepare to endTransaction. TargetHost={}, Request: {}", target_host.data(), request.DebugString());

  auto invocation_context = new InvocationContext<EndTransactionResponse>();
  invocation_context->task_name = fmt::format("End transaction[{}] of message[] against {}", request.transaction_id(),
                                              request.message_id(), target_host);
  invocation_context->remote_address = target_host;
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  // Set RPC deadline.
  auto deadline = std::chrono::system_clock::now() + timeout;
  invocation_context->context.set_deadline(deadline);

  auto callback = [target_host, cb](const InvocationContext<EndTransactionResponse>* invocation_context) {
    std::error_code ec;
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to write EndTransaction to wire. gRPC-code: {}, gRPC-message: {}, host={}",
                  invocation_context->status.error_code(), invocation_context->status.error_message(),
                  invocation_context->remote_address);
      ec = ErrorCode::BadRequest;
      cb(ec, invocation_context->response);
      return;
    }

    auto&& status = invocation_context->response.status();
    auto&& peer_address = invocation_context->remote_address;
    switch (status.code()) {
      case rmq::Code::OK: {
        SPDLOG_DEBUG("endTransaction completed OK. Response: {}, host={}", invocation_context->response.DebugString(),
                     peer_address);
        break;
      }

      case rmq::Code::ILLEGAL_TOPIC: {
        SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::IllegalTopic;
        break;
      }

      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
        SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::IllegalConsumerGroup;
        break;
      }

      case rmq::Code::INVALID_TRANSACTION_ID: {
        SPDLOG_WARN("InvalidTransactionId: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::InvalidTransactionId;
        break;
      }

      case rmq::Code::CLIENT_ID_REQUIRED: {
        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::InternalClientError;
        break;
      }

      case rmq::Code::TOPIC_NOT_FOUND: {
        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::TopicNotFound;
        break;
      }

      case rmq::Code::UNAUTHORIZED: {
        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::Unauthorized;
        break;
      }

      case rmq::Code::FORBIDDEN: {
        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::Forbidden;
        break;
      }

      case rmq::Code::INTERNAL_SERVER_ERROR: {
        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::InternalServerError;
        break;
      }

      case rmq::Code::PROXY_TIMEOUT: {
        SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), peer_address);
        ec = ErrorCode::GatewayTimeout;
        break;
      }

      default: {
        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {}, host={}", status.message(), peer_address);
        ec = ErrorCode::NotSupported;
        break;
      }
    }
    cb(ec, invocation_context->response);
  };

  invocation_context->callback = callback;
  client->asyncEndTransaction(request, invocation_context);
}