void ReceiveMessageStreamReader::OnReadDone()

in cpp/source/client/ReceiveMessageStreamReader.cpp [50:154]


void ReceiveMessageStreamReader::OnReadDone(bool ok) {
  if (ok) {
    SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone: ok={}", ok);
  } else {
    if (result_.messages.empty() && !ec_) {
      SPDLOG_WARN("ReceiveMessageStreamReader#OnReadDone: ok={}", ok);
      ec_ = ErrorCode::BadGateway;
    } else {
      SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone reached end-of-stream");
    }
    return;
  }

  SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone: response={}", response_.DebugString());
  switch (response_.content_case()) {
    case rmq::ReceiveMessageResponse::ContentCase::kStatus: {
      SPDLOG_DEBUG("ReceiveMessageResponse.status.message={}", response_.status().message());
      switch (response_.status().code()) {
        case rmq::Code::OK: {
          break;
        }
        case rmq::Code::ILLEGAL_TOPIC: {
          ec_ = ErrorCode::IllegalTopic;
          break;
        }

        case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
          ec_ = ErrorCode::IllegalConsumerGroup;
          break;
        }

        case rmq::Code::ILLEGAL_FILTER_EXPRESSION: {
          ec_ = ErrorCode::IllegalFilterExpression;
          break;
        }

        case rmq::Code::CLIENT_ID_REQUIRED: {
          ec_ = ErrorCode::InternalClientError;
          break;
        }

        case rmq::Code::TOPIC_NOT_FOUND: {
          ec_ = ErrorCode::TopicNotFound;
          break;
        }

        case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
          ec_ = ErrorCode::ConsumerGroupNotFound;
          break;
        }

        case rmq::Code::TOO_MANY_REQUESTS: {
          ec_ = ErrorCode::TooManyRequests;
          break;
        }

        case rmq::Code::MESSAGE_NOT_FOUND: {
          ec_ = ErrorCode::NoContent;
          break;
        }

        case rmq::Code::UNAUTHORIZED: {
          ec_ = ErrorCode::Unauthorized;
          break;
        }

        case rmq::Code::FORBIDDEN: {
          ec_ = ErrorCode::Forbidden;
          break;
        }

        case rmq::Code::INTERNAL_SERVER_ERROR: {
          ec_ = ErrorCode::InternalServerError;
          break;
        }

        case rmq::Code::PROXY_TIMEOUT: {
          ec_ = ErrorCode::GatewayTimeout;
          break;
        }

        default: {
          ec_ = ErrorCode::NotSupported;
          SPDLOG_WARN("Unsupported code={}", response_.status().code());
          break;
        }
      }
      break;
    }
    case rmq::ReceiveMessageResponse::ContentCase::kMessage: {
      auto client_manager = client_manager_.lock();
      auto message = client_manager->wrapMessage(response_.message());
      auto raw = const_cast<Message*>(message.get());
      raw->mutableExtension().target_endpoint = peer_address_;
      if (message) {
        result_.messages.push_back(message);
      }
      break;
    }
    default:
      break;
  }

  StartRead(&response_);
}