void CongestionControlRPCEnv::loop()

in congestion_control/CongestionControlRPCEnv.cpp [59:125]


void CongestionControlRPCEnv::loop(const std::string &address) {
  std::shared_ptr<grpc::Channel> channel =
      grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
  auto stub = RPC::NewStub(channel);

  LOG(INFO) << "Connecting to ActorPoolServer at " << address << " ...";
  const auto &deadline = std::chrono::system_clock::now() + kConnectTimeout;
  if (!channel->WaitForConnected(deadline)) {
    LOG(FATAL) << "Timed out connecting to ActorPoolServer: " << address;
  }

  // Notify that we are connected
  {
    std::lock_guard<std::mutex> g(mutex_);
    connected_ = true;
  }
  cv_.notify_one();
  LOG(INFO) << "Connected to ActorPoolServer: " << address;

  grpc::ClientContext context;
  std::shared_ptr<grpc::ClientReaderWriter<CallRequest, CallResponse>> stream(
      stub->Call(&context));

  Action action;
  bool done = true;
  uint32_t episode_step = 0;
  float episode_return = 0.0;
  CallResponse resp;
  std::unique_lock<std::mutex> lock(mutex_);

  while (!shutdown_) {
    cv_.wait(lock, [&]() -> bool { return (observationReady_ || shutdown_); });
    if (shutdown_) {
      LOG(INFO) << "RPC env loop terminating";
      const auto &status = stream->Finish();
      if (!status.ok()) {
        LOG(ERROR) << "RPC env loop failed on finish.";
      }
      return;
    }

    // The lifetime of a connection is seen as a single episode, so
    // done is set to true only at the beginning of the episode (to mark
    // the end of the previous episode. Episodic training should be
    // implemented via resetting the entire connection.
    done = (episode_step == 0);
    episode_return += reward_;
    VLOG(2) << "Episode step = " << episode_step
            << ", total return = " << episode_return;

    const auto &req = makeCallRequest(actorId_, tensor_, reward_, done);
    observationReady_ = false; // Back to waiting

    stream->Write(req);
    if (!stream->Read(&resp)) {
      LOG(FATAL) << "Read failed from gRPC server.";
    }
    if (resp.has_error()) {
      LOG(FATAL) << "Error in response from RL server: "
                 << resp.error().message();
    }
    action.cwndAction = getActionFromCallResponse(resp);
    onAction(action);

    episode_step++;
  }
}