in congestion_control/CongestionControlRPCEnv.cpp [59:125]
void CongestionControlRPCEnv::loop(const std::string &address) {
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
auto stub = RPC::NewStub(channel);
LOG(INFO) << "Connecting to ActorPoolServer at " << address << " ...";
const auto &deadline = std::chrono::system_clock::now() + kConnectTimeout;
if (!channel->WaitForConnected(deadline)) {
LOG(FATAL) << "Timed out connecting to ActorPoolServer: " << address;
}
// Notify that we are connected
{
std::lock_guard<std::mutex> g(mutex_);
connected_ = true;
}
cv_.notify_one();
LOG(INFO) << "Connected to ActorPoolServer: " << address;
grpc::ClientContext context;
std::shared_ptr<grpc::ClientReaderWriter<CallRequest, CallResponse>> stream(
stub->Call(&context));
Action action;
bool done = true;
uint32_t episode_step = 0;
float episode_return = 0.0;
CallResponse resp;
std::unique_lock<std::mutex> lock(mutex_);
while (!shutdown_) {
cv_.wait(lock, [&]() -> bool { return (observationReady_ || shutdown_); });
if (shutdown_) {
LOG(INFO) << "RPC env loop terminating";
const auto &status = stream->Finish();
if (!status.ok()) {
LOG(ERROR) << "RPC env loop failed on finish.";
}
return;
}
// The lifetime of a connection is seen as a single episode, so
// done is set to true only at the beginning of the episode (to mark
// the end of the previous episode. Episodic training should be
// implemented via resetting the entire connection.
done = (episode_step == 0);
episode_return += reward_;
VLOG(2) << "Episode step = " << episode_step
<< ", total return = " << episode_return;
const auto &req = makeCallRequest(actorId_, tensor_, reward_, done);
observationReady_ = false; // Back to waiting
stream->Write(req);
if (!stream->Read(&resp)) {
LOG(FATAL) << "Read failed from gRPC server.";
}
if (resp.has_error()) {
LOG(FATAL) << "Error in response from RL server: "
<< resp.error().message();
}
action.cwndAction = getActionFromCallResponse(resp);
onAction(action);
episode_step++;
}
}