in dissociated-ipc/cudf-flight-client.cc [203:262]
void run_meta_loop() {
while (!finished_metadata_.load()) {
// progress the connection until we get an event
while (ctrl_cnxn_->Progress()) {
}
{
std::unique_lock<std::mutex> guard(queue_mutex_);
while (!metadata_queue_.empty()) {
// handle any metadata messages in our queue
auto buf = std::move(metadata_queue_.front());
metadata_queue_.pop();
guard.unlock();
while (buf.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
ctrl_cnxn_->Progress();
}
std::shared_ptr<arrow::Buffer> buffer = buf.get();
if (static_cast<MetadataMsgType>(buffer->data()[0]) == MetadataMsgType::EOS) {
finished_metadata_.store(true);
guard.lock();
continue;
}
uint32_t sequence_number = utils::BytesToUint32LE(buffer->data() + 1);
auto metadata = SliceBuffer(buffer, 5, buffer->size() - 5);
// store a mapping of sequence numbers to std::future that returns the data
std::promise<std::unique_ptr<ipc::Message>> p;
{
std::lock_guard<std::mutex> lock(msg_mutex_);
msg_map_.insert({sequence_number, p.get_future()});
}
cv_progress_.notify_all();
auto msg = ipc::Message::Open(metadata, nullptr).ValueOrDie();
if (!ipc::Message::HasBody(msg->type())) {
p.set_value(std::move(msg));
guard.lock();
continue;
}
{
std::lock_guard<std::mutex> lock(polling_mutex_);
polling_map_.insert(
{sequence_number, PendingMsg{std::move(p), std::move(metadata)}});
}
guard.lock();
}
}
if (finished_metadata_.load()) break;
auto status = utils::FromUcsStatus("ucp_worker_wait", ctrl_cnxn_->WorkerWait());
if (!status.ok()) {
ARROW_LOG(ERROR) << status.ToString();
return;
}
}
}