void run_meta_loop()

in dissociated-ipc/cudf-flight-client.cc [203:262]


  void run_meta_loop() {
    while (!finished_metadata_.load()) {
      // progress the connection until we get an event
      while (ctrl_cnxn_->Progress()) {
      }
      {
        std::unique_lock<std::mutex> guard(queue_mutex_);
        while (!metadata_queue_.empty()) {
          // handle any metadata messages in our queue
          auto buf = std::move(metadata_queue_.front());
          metadata_queue_.pop();
          guard.unlock();

          while (buf.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
            ctrl_cnxn_->Progress();
          }

          std::shared_ptr<arrow::Buffer> buffer = buf.get();
          if (static_cast<MetadataMsgType>(buffer->data()[0]) == MetadataMsgType::EOS) {
            finished_metadata_.store(true);
            guard.lock();
            continue;
          }

          uint32_t sequence_number = utils::BytesToUint32LE(buffer->data() + 1);
          auto metadata = SliceBuffer(buffer, 5, buffer->size() - 5);

          // store a mapping of sequence numbers to std::future that returns the data
          std::promise<std::unique_ptr<ipc::Message>> p;
          {
            std::lock_guard<std::mutex> lock(msg_mutex_);
            msg_map_.insert({sequence_number, p.get_future()});
          }
          cv_progress_.notify_all();

          auto msg = ipc::Message::Open(metadata, nullptr).ValueOrDie();
          if (!ipc::Message::HasBody(msg->type())) {
            p.set_value(std::move(msg));
            guard.lock();
            continue;
          }

          {
            std::lock_guard<std::mutex> lock(polling_mutex_);
            polling_map_.insert(
                {sequence_number, PendingMsg{std::move(p), std::move(metadata)}});
          }

          guard.lock();
        }
      }

      if (finished_metadata_.load()) break;
      auto status = utils::FromUcsStatus("ucp_worker_wait", ctrl_cnxn_->WorkerWait());
      if (!status.ok()) {
        ARROW_LOG(ERROR) << status.ToString();
        return;
      }
    }
  }