arrow::Status send_metadata_stream()

in dissociated-ipc/cudf-flight-server.cc [138:182]


  arrow::Status send_metadata_stream(UcxServer::ClientWorker* worker,
                                     const std::string& ident) {
    auto it = data_sets_.find(ident);
    if (it == data_sets_.end()) {
      return arrow::Status::Invalid("data set not found:", ident);
    }

    ipc::IpcWriteOptions ipc_options;
    ipc::DictionaryFieldMapper mapper;
    const auto& record_list = it->second;
    auto schema = record_list[0]->schema();
    ARROW_RETURN_NOT_OK(mapper.AddSchemaFields(*schema));

    // for each record in the stream, collect the IPC metadata to send
    uint32_t sequence_num = 0;
    // schema payload is first
    ipc::IpcPayload payload;
    ARROW_RETURN_NOT_OK(ipc::GetSchemaPayload(*schema, ipc_options, mapper, &payload));
    ARROW_RETURN_NOT_OK(write_ipc_metadata(worker->conn_.get(), payload, sequence_num++));

    // then any dictionaries
    ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
                          ipc::CollectDictionaries(*record_list[0], mapper));
    for (const auto& pair : dictionaries) {
      ARROW_RETURN_NOT_OK(
          ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options, &payload));
      ARROW_RETURN_NOT_OK(
          write_ipc_metadata(worker->conn_.get(), payload, sequence_num++));
    }

    // finally the record batch metadata messages
    for (const auto& batch : record_list) {
      ARROW_RETURN_NOT_OK(ipc::GetRecordBatchPayload(*batch, ipc_options, &payload));
      ARROW_RETURN_NOT_OK(
          write_ipc_metadata(worker->conn_.get(), payload, sequence_num++));
    }

    // finally, we send the End-Of-Stream message
    std::array<uint8_t, 5> eos_bytes{static_cast<uint8_t>(MetadataMsgType::EOS), 0, 0, 0,
                                     0};
    utils::Uint32ToBytesLE(sequence_num, eos_bytes.data() + 1);

    ARROW_RETURN_NOT_OK(worker->conn_->Flush());
    return worker->conn_->SendAM(0, eos_bytes.data(), eos_bytes.size());
  }