void run_data_loop()

in dissociated-ipc/cudf-flight-client.cc [110:159]


  void run_data_loop() {
    if (arrow::cuda::IsCudaMemoryManager(*mm_)) {
      // since we're in a new thread, we need to make sure to push the cuda context
      // so that ucx uses the same cuda context as the Arrow data is using, otherwise
      // the device pointers aren't valid
      auto ctx = *(*arrow::cuda::AsCudaMemoryManager(mm_))->cuda_device()->GetContext();
      cuCtxPushCurrent(reinterpret_cast<CUcontext>(ctx->handle()));
    }

    while (true) {
      // progress the connection until an event happens
      while (data_cnxn_->Progress()) {
      }
      {
        // check if we have received any metadata which indicate we need to poll
        // for a corresponding tagged data message
        std::unique_lock<std::mutex> guard(polling_mutex_);
        for (auto it = polling_map_.begin(); it != polling_map_.end();) {
          auto maybe_tag =
              data_cnxn_->ProbeForTag(ucp_tag_t(it->first), 0x00000000FFFFFFFF, 1);
          if (!maybe_tag.ok()) {
            ARROW_LOG(ERROR) << maybe_tag.status().ToString();
            return;
          }

          auto tag_pair = maybe_tag.MoveValueUnsafe();
          if (tag_pair.second != nullptr) {
            // got one!
            auto st = RecvTag(tag_pair.second, tag_pair.first, std::move(it->second));
            if (!st.ok()) {
              ARROW_LOG(ERROR) << st.ToString();
              return;
            }
            it = polling_map_.erase(it);
          } else {
            ++it;
          }
        }
      }

      // if the metadata stream has ended...
      if (finished_metadata_.load()) {
        // we are done if there's nothing left to poll for and nothing outstanding
        std::lock_guard<std::mutex> guard(polling_mutex_);
        if (polling_map_.empty() && outstanding_tags_.load() == 0) {
          break;
        }
      }
    }
  }