bool ClientManagerImpl::send()

in cpp/source/client/ClientManagerImpl.cpp [281:477]


bool ClientManagerImpl::send(const std::string& target_host,
                             const Metadata& metadata,
                             SendMessageRequest& request,
                             SendResultCallback cb) {
  assert(cb);
  SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.ShortDebugString());
  RpcClientSharedPtr client = getRpcClient(target_host);
  // Invocation context will be deleted in its onComplete() method.
  auto invocation_context = new InvocationContext<SendMessageResponse>();
  invocation_context->task_name = fmt::format("Send message to {}", target_host);
  invocation_context->remote_address = target_host;
  for (const auto& entry : metadata) {
    invocation_context->context.AddMetadata(entry.first, entry.second);
  }

  const std::string& topic = request.messages().begin()->topic().name();
  std::weak_ptr<ClientManager> client_manager(shared_from_this());
  auto completion_callback = [topic, cb, client_manager,
                              target_host](const InvocationContext<SendMessageResponse>* invocation_context) {
    ClientManagerPtr client_manager_ptr = client_manager.lock();
    if (!client_manager_ptr) {
      return;
    }

    if (State::STARTED != client_manager_ptr->state()) {
      // TODO: Would this leak some memory?
      return;
    }

    SendResult send_result = {};
    send_result.target = target_host;
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}",
                  invocation_context->remote_address, invocation_context->status.error_code(),
                  invocation_context->status.error_message());
      send_result.ec = ErrorCode::RequestTimeout;
      cb(send_result);
      return;
    }

    auto&& status = invocation_context->response.status();
    switch (invocation_context->response.status().code()) {
      case rmq::Code::OK: {
        if (!invocation_context->response.entries().empty()) {
          auto first = invocation_context->response.entries().begin();
          send_result.message_id = first->message_id();
          send_result.transaction_id = first->transaction_id();
          // unique handle to identify a message to recall,
          // only delay message is supported for now
          send_result.recall_handle = first->recall_handle();
        } else {
          SPDLOG_ERROR("Unexpected send-message-response: {}", invocation_context->response.ShortDebugString());
        }
        break;
      }

      case rmq::Code::ILLEGAL_TOPIC: {
        SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::IllegalTopic;
        break;
      }

      case rmq::Code::ILLEGAL_MESSAGE_TAG: {
        SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::IllegalMessageTag;
        break;
      }

      case rmq::Code::ILLEGAL_MESSAGE_KEY: {
        SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::IllegalMessageKey;
        break;
      }

      case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
        SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::IllegalMessageGroup;
        break;
      }

      case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
        SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::IllegalMessageProperty;
        break;
      }

      case rmq::Code::ILLEGAL_DELIVERY_TIME: {
        SPDLOG_ERROR("IllegalDeliveryTime: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::IllegalMessageProperty;
        break;
      }

      case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
        SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::MessagePropertiesTooLarge;
        break;
      }

      case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
        SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::MessageBodyTooLarge;
        break;
      }

      case rmq::Code::MESSAGE_BODY_EMPTY: {
        SPDLOG_ERROR("MessageBodyEmpty: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::MessageBodyTooLarge;
        break;
      }

      case rmq::Code::TOPIC_NOT_FOUND: {
        SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::TopicNotFound;
        break;
      }

      case rmq::Code::NOT_FOUND: {
        SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::NotFound;
        break;
      }

      case rmq::Code::UNAUTHORIZED: {
        SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::Unauthorized;
        break;
      }

      case rmq::Code::FORBIDDEN: {
        SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::Forbidden;
        break;
      }

      case rmq::Code::MESSAGE_CORRUPTED: {
        SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::MessageCorrupted;
        break;
      }

      case rmq::Code::TOO_MANY_REQUESTS: {
        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::TooManyRequests;
        break;
      }

      case rmq::Code::INTERNAL_SERVER_ERROR: {
        SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::InternalServerError;
        break;
      }

      case rmq::Code::HA_NOT_AVAILABLE: {
        SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::InternalServerError;
        break;
      }

      case rmq::Code::PROXY_TIMEOUT: {
        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::GatewayTimeout;
        break;
      }

      case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::GatewayTimeout;
        break;
      }

      case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
        send_result.ec = ErrorCode::GatewayTimeout;
        break;
      }

      case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
        SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address,
                    invocation_context->response.ShortDebugString());
        send_result.ec = ErrorCode::MessagePropertyConflictWithType;
        break;
      }

      default: {
        SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
        send_result.ec = ErrorCode::NotSupported;
        break;
      }
    }

    cb(send_result);
  };

  invocation_context->callback = completion_callback;
  client->asyncSend(request, invocation_context);
  return true;
}