void ClientManagerImpl::forwardMessageToDeadLetterQueue()

in cpp/source/client/ClientManagerImpl.cpp [1275:1363]


void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& target_host,
                                                        const Metadata& metadata,
                                                        const ForwardMessageToDeadLetterQueueRequest& request,
                                                        std::chrono::milliseconds timeout,
                                                        const std::function<void(const std::error_code&)>& cb) {
  SPDLOG_DEBUG("ForwardMessageToDeadLetterQueue Request: {}", request.DebugString());
  auto client = getRpcClient(target_host);
  auto invocation_context = new InvocationContext<ForwardMessageToDeadLetterQueueResponse>();
  invocation_context->task_name =
      fmt::format("Forward message[{}] to DLQ against {}", request.message_id(), target_host);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [cb](const InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to transmit SendMessageToDeadLetterQueueRequest to host={}",
                  invocation_context->remote_address);
      std::error_code ec = ErrorCode::BadRequest;
      cb(ec);
      return;
    }

    SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from server[host={}]", invocation_context->remote_address);
    std::error_code ec;
    auto&& status = invocation_context->response.status();
    auto&& peer_address = invocation_context->remote_address;
    switch (status.code()) {
      case rmq::Code::OK: {
        break;
      }
      case rmq::Code::ILLEGAL_TOPIC: {
        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
        ec = ErrorCode::IllegalTopic;
        break;
      }

      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
        SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address);
        ec = ErrorCode::IllegalConsumerGroup;
        break;
      }

      case rmq::Code::INVALID_RECEIPT_HANDLE: {
        SPDLOG_WARN("IllegalReceiptHandle: {}. Host={}", status.message(), peer_address);
        ec = ErrorCode::InvalidReceiptHandle;
        break;
      }

      case rmq::Code::CLIENT_ID_REQUIRED: {
        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
        ec = ErrorCode::InternalClientError;
        break;
      }

      case rmq::Code::TOPIC_NOT_FOUND: {
        ec = ErrorCode::TopicNotFound;
        break;
      }

      case rmq::Code::INTERNAL_SERVER_ERROR: {
        ec = ErrorCode::ServiceUnavailable;
        break;
      }
        
      case rmq::Code::TOO_MANY_REQUESTS: {
        ec = ErrorCode::TooManyRequests;
        break;
      }

      case rmq::Code::PROXY_TIMEOUT: {
        ec = ErrorCode::GatewayTimeout;
        break;
      }

      default: {
        ec = ErrorCode::NotImplemented;
        break;
      }
    }
    cb(ec);
  };

  invocation_context->callback = callback;
  client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
}