in cpp/source/client/ReceiveMessageStreamReader.cpp [50:154]
void ReceiveMessageStreamReader::OnReadDone(bool ok) {
if (ok) {
SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone: ok={}", ok);
} else {
if (result_.messages.empty() && !ec_) {
SPDLOG_WARN("ReceiveMessageStreamReader#OnReadDone: ok={}", ok);
ec_ = ErrorCode::BadGateway;
} else {
SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone reached end-of-stream");
}
return;
}
SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone: response={}", response_.DebugString());
switch (response_.content_case()) {
case rmq::ReceiveMessageResponse::ContentCase::kStatus: {
SPDLOG_DEBUG("ReceiveMessageResponse.status.message={}", response_.status().message());
switch (response_.status().code()) {
case rmq::Code::OK: {
break;
}
case rmq::Code::ILLEGAL_TOPIC: {
ec_ = ErrorCode::IllegalTopic;
break;
}
case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
ec_ = ErrorCode::IllegalConsumerGroup;
break;
}
case rmq::Code::ILLEGAL_FILTER_EXPRESSION: {
ec_ = ErrorCode::IllegalFilterExpression;
break;
}
case rmq::Code::CLIENT_ID_REQUIRED: {
ec_ = ErrorCode::InternalClientError;
break;
}
case rmq::Code::TOPIC_NOT_FOUND: {
ec_ = ErrorCode::TopicNotFound;
break;
}
case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
ec_ = ErrorCode::ConsumerGroupNotFound;
break;
}
case rmq::Code::TOO_MANY_REQUESTS: {
ec_ = ErrorCode::TooManyRequests;
break;
}
case rmq::Code::MESSAGE_NOT_FOUND: {
ec_ = ErrorCode::NoContent;
break;
}
case rmq::Code::UNAUTHORIZED: {
ec_ = ErrorCode::Unauthorized;
break;
}
case rmq::Code::FORBIDDEN: {
ec_ = ErrorCode::Forbidden;
break;
}
case rmq::Code::INTERNAL_SERVER_ERROR: {
ec_ = ErrorCode::InternalServerError;
break;
}
case rmq::Code::PROXY_TIMEOUT: {
ec_ = ErrorCode::GatewayTimeout;
break;
}
default: {
ec_ = ErrorCode::NotSupported;
SPDLOG_WARN("Unsupported code={}", response_.status().code());
break;
}
}
break;
}
case rmq::ReceiveMessageResponse::ContentCase::kMessage: {
auto client_manager = client_manager_.lock();
auto message = client_manager->wrapMessage(response_.message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
if (message) {
result_.messages.push_back(message);
}
break;
}
default:
break;
}
StartRead(&response_);
}