cpp/source/client/ReceiveMessageStreamReader.cpp (128 lines of code) (raw):
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "ReceiveMessageStreamReader.h"
#include <chrono>
#include "apache/rocketmq/v2/definition.pb.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManager> client_manager,
                                                       rmq::MessagingService::Stub* stub,
                                                       std::string peer_address,
                                                       rmq::ReceiveMessageRequest request,
                                                       std::unique_ptr<ReceiveMessageContext> context)
    : client_manager_(std::move(client_manager)),
      stub_(stub),
      peer_address_(std::move(peer_address)),
      request_(std::move(request)),
      context_(std::move(context)) {
  for (const auto& entry : context_->metadata) {
    client_context_.AddMetadata(entry.first, entry.second);
  }
  client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout);
  stub_->async()->ReceiveMessage(&client_context_, &request_, this);
  result_.source_host = peer_address_;
  StartCall();
  StartRead(&response_);
}
void ReceiveMessageStreamReader::OnReadDone(bool ok) {
  if (ok) {
    SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone: ok={}", ok);
  } else {
    if (result_.messages.empty() && !ec_) {
      SPDLOG_WARN("ReceiveMessageStreamReader#OnReadDone: ok={}", ok);
      ec_ = ErrorCode::BadGateway;
    } else {
      SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone reached end-of-stream");
    }
    return;
  }
  SPDLOG_DEBUG("ReceiveMessageStreamReader#OnReadDone: response={}", response_.DebugString());
  switch (response_.content_case()) {
    case rmq::ReceiveMessageResponse::ContentCase::kStatus: {
      SPDLOG_DEBUG("ReceiveMessageResponse.status.message={}", response_.status().message());
      switch (response_.status().code()) {
        case rmq::Code::OK: {
          break;
        }
        case rmq::Code::ILLEGAL_TOPIC: {
          ec_ = ErrorCode::IllegalTopic;
          break;
        }
        case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
          ec_ = ErrorCode::IllegalConsumerGroup;
          break;
        }
        case rmq::Code::ILLEGAL_FILTER_EXPRESSION: {
          ec_ = ErrorCode::IllegalFilterExpression;
          break;
        }
        case rmq::Code::CLIENT_ID_REQUIRED: {
          ec_ = ErrorCode::InternalClientError;
          break;
        }
        case rmq::Code::TOPIC_NOT_FOUND: {
          ec_ = ErrorCode::TopicNotFound;
          break;
        }
        case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
          ec_ = ErrorCode::ConsumerGroupNotFound;
          break;
        }
        case rmq::Code::TOO_MANY_REQUESTS: {
          ec_ = ErrorCode::TooManyRequests;
          break;
        }
        case rmq::Code::MESSAGE_NOT_FOUND: {
          ec_ = ErrorCode::NoContent;
          break;
        }
        case rmq::Code::UNAUTHORIZED: {
          ec_ = ErrorCode::Unauthorized;
          break;
        }
        case rmq::Code::FORBIDDEN: {
          ec_ = ErrorCode::Forbidden;
          break;
        }
        case rmq::Code::INTERNAL_SERVER_ERROR: {
          ec_ = ErrorCode::InternalServerError;
          break;
        }
        case rmq::Code::PROXY_TIMEOUT: {
          ec_ = ErrorCode::GatewayTimeout;
          break;
        }
        default: {
          ec_ = ErrorCode::NotSupported;
          SPDLOG_WARN("Unsupported code={}", response_.status().code());
          break;
        }
      }
      break;
    }
    case rmq::ReceiveMessageResponse::ContentCase::kMessage: {
      auto client_manager = client_manager_.lock();
      auto message = client_manager->wrapMessage(response_.message());
      auto raw = const_cast<Message*>(message.get());
      raw->mutableExtension().target_endpoint = peer_address_;
      if (message) {
        result_.messages.push_back(message);
      }
      break;
    }
    default:
      break;
  }
  StartRead(&response_);
}
void ReceiveMessageStreamReader::OnDone(const grpc::Status& s) {
  if (!s.ok()) {
    SPDLOG_WARN("ReceiveMessageStreamReader#OnDone: status.ok={}, status.error_message={}", s.ok(), s.error_message());
  } else {
    SPDLOG_DEBUG("ReceiveMessageStreamReader#OnDone: status.ok={}", s.ok());
  }
  status_ = s;
  context_->callback(ec_, result_);
  delete this;
}
ROCKETMQ_NAMESPACE_END