void ClientManagerImpl::ack()

in cpp/source/client/ClientManagerImpl.cpp [981:1092]


void ClientManagerImpl::ack(const std::string& target,
                            const Metadata& metadata,
                            const AckMessageRequest& request,
                            std::chrono::milliseconds timeout,
                            const std::function<void(const std::error_code&)>& cb) {
  std::string target_host(target.data(), target.length());
  SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. AckMessageRequest: {}", target_host,
               request.DebugString());
  RpcClientSharedPtr client = getRpcClient(target_host);

  auto invocation_context = new InvocationContext<AckMessageResponse>();
  invocation_context->task_name = fmt::format("Ack messages against {}", target);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  // TODO: Use capture by move and pass-by-value paradigm when C++ 14 is available.
  auto callback = [request, cb](const InvocationContext<AckMessageResponse>* invocation_context) {
    std::error_code ec;
    if (!invocation_context->status.ok()) {
      ec = ErrorCode::RequestTimeout;
      cb(ec);
      return;
    }

    auto&& status = invocation_context->response.status();
    switch (status.code()) {
      case rmq::Code::OK: {
        SPDLOG_DEBUG("Ack OK. host={}", invocation_context->remote_address);
        break;
      }

      case rmq::Code::MULTIPLE_RESULTS: {
        SPDLOG_DEBUG("Server returns multiple results. host={}", invocation_context->remote_address);
        // Treat it as successful, allowing top tier processing according to response entries.
        break;
      }

      case rmq::Code::ILLEGAL_TOPIC: {
        SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::IllegalTopic;
        break;
      }

      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
        SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::IllegalConsumerGroup;
        break;
      }

      case rmq::Code::INVALID_RECEIPT_HANDLE: {
        SPDLOG_WARN("InvalidReceiptHandle: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::InvalidReceiptHandle;
        break;
      }

      case rmq::Code::CLIENT_ID_REQUIRED: {
        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::InternalClientError;
        break;
      }

      case rmq::Code::TOPIC_NOT_FOUND: {
        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::TopicNotFound;
        break;
      }

      case rmq::Code::UNAUTHORIZED: {
        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
        break;
      }

      case rmq::Code::FORBIDDEN: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
        break;
      }

      case rmq::Code::TOO_MANY_REQUESTS: {
        SPDLOG_WARN("TooManyRequests: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::TooManyRequests;
        break;
      }

      case rmq::Code::INTERNAL_SERVER_ERROR: {
        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
        break;
      }

      case rmq::Code::PROXY_TIMEOUT: {
        SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::GatewayTimeout;
        break;
      }

      default: {
        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. host={}", invocation_context->remote_address);
        ec = ErrorCode::NotSupported;
        break;
      }
    }
    cb(ec);
  };
  invocation_context->callback = callback;
  client->asyncAck(request, invocation_context);
}