void ClientManagerImpl::heartbeat()

in cpp/source/client/ClientManagerImpl.cpp [175:261]


void ClientManagerImpl::heartbeat(const std::string& target_host,
                                  const Metadata& metadata,
                                  const HeartbeatRequest& request,
                                  std::chrono::milliseconds timeout,
                                  const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) {
  SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.ShortDebugString());
  auto client = getRpcClient(target_host, true);
  auto invocation_context = new InvocationContext<HeartbeatResponse>();
  invocation_context->task_name = fmt::format("Heartbeat to {}", target_host);
  invocation_context->remote_address = target_host;
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [cb](const InvocationContext<HeartbeatResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to send heartbeat to target_host={}. gRPC code: {}, message: {}",
                  invocation_context->remote_address, invocation_context->status.error_code(),
                  invocation_context->status.error_message());
      std::error_code ec = ErrorCode::RequestTimeout;
      cb(ec, invocation_context->response);
      return;
    }

    auto&& status = invocation_context->response.status();
    std::error_code ec;
    switch (status.code()) {
      case rmq::Code::OK: {
        cb(ec, invocation_context->response);
        break;
      }

      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
        SPDLOG_ERROR("IllegalConsumerGroup: {}. Host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::IllegalConsumerGroup;
        break;
      }

      case rmq::Code::TOO_MANY_REQUESTS: {
        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::TooManyRequests;
        cb(ec, invocation_context->response);
        break;
      }

      case rmq::Code::UNAUTHORIZED: {
        SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
        cb(ec, invocation_context->response);
        break;
      }

      case rmq::Code::UNRECOGNIZED_CLIENT_TYPE: {
        SPDLOG_ERROR("UnsupportedClientType: {}. Host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::UnsupportedClientType;
        cb(ec, invocation_context->response);
        break;
      }

      case rmq::Code::CLIENT_ID_REQUIRED: {
        SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::InternalClientError;
        cb(ec, invocation_context->response);
        break;
      }

      case rmq::Code::INTERNAL_SERVER_ERROR: {
        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
        cb(ec, invocation_context->response);
        break;
      }

      default: {
        SPDLOG_WARN("NotSupported: Please upgrade SDK to latest release. Message={}, host={}", status.message(),
                    invocation_context->remote_address);
        ec = ErrorCode::NotSupported;
        cb(ec, invocation_context->response);
        break;
      }
    }
  };

  invocation_context->callback = callback;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
  client->asyncHeartbeat(request, invocation_context);
}