in cpp/source/client/ClientManagerImpl.cpp [648:749]
void ClientManagerImpl::queryAssignment(
const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) {
SPDLOG_DEBUG("Prepare to send query assignment request to broker[address={}]", target);
std::shared_ptr<RpcClient> client = getRpcClient(target);
auto callback = [&, cb](const InvocationContext<QueryAssignmentResponse>* invocation_context) {
if (!invocation_context->status.ok()) {
SPDLOG_WARN("Failed to query assignment. Reason: {}", invocation_context->status.error_message());
std::error_code ec = ErrorCode::RequestTimeout;
cb(ec, invocation_context->response);
return;
}
auto&& status = invocation_context->response.status();
std::error_code ec;
switch (status.code()) {
case rmq::Code::OK: {
SPDLOG_DEBUG("Query assignment OK. Host={}", invocation_context->remote_address);
break;
}
case rmq::Code::ILLEGAL_ACCESS_POINT: {
SPDLOG_WARN("IllegalAccessPoint: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::IllegalAccessPoint;
break;
}
case rmq::Code::ILLEGAL_TOPIC: {
SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::IllegalTopic;
break;
}
case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::IllegalConsumerGroup;
break;
}
case rmq::Code::CLIENT_ID_REQUIRED: {
SPDLOG_WARN("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalClientError;
break;
}
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
break;
}
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Forbidden;
break;
}
case rmq::Code::TOPIC_NOT_FOUND: {
SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::TopicNotFound;
break;
}
case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
SPDLOG_WARN("ConsumerGroupNotFound: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::ConsumerGroupNotFound;
break;
}
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
break;
}
case rmq::Code::PROXY_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::GatewayTimeout;
break;
}
default: {
SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. Host={}", invocation_context->remote_address);
ec = ErrorCode::NotSupported;
break;
}
}
cb(ec, invocation_context->response);
};
auto invocation_context = new InvocationContext<QueryAssignmentResponse>();
invocation_context->task_name = fmt::format("QueryAssignment from {}", target);
invocation_context->remote_address = target;
for (const auto& item : metadata) {
invocation_context->context.AddMetadata(item.first, item.second);
}
invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
invocation_context->callback = callback;
client->asyncQueryAssignment(request, invocation_context);
}