arrow::Status send_data_stream()

in dissociated-ipc/cudf-flight-server.cc [227:258]


  arrow::Status send_data_stream(UcxServer::ClientWorker* worker,
                                 const std::string& ident) {
    auto it = data_sets_.find(ident);
    if (it == data_sets_.end()) {
      return arrow::Status::Invalid("data set not found:", ident);
    }

    ipc::IpcWriteOptions ipc_options;
    ipc::DictionaryFieldMapper mapper;
    const auto& record_list = it->second;
    auto schema = record_list[0]->schema();
    ARROW_RETURN_NOT_OK(mapper.AddSchemaFields(*schema));

    // start at 1 since schema payload has no body
    uint32_t sequence_num = 1;

    ipc::IpcPayload payload;
    ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
                          ipc::CollectDictionaries(*record_list[0], mapper));
    for (const auto& pair : dictionaries) {
      ARROW_RETURN_NOT_OK(
          ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options, &payload));
      ARROW_RETURN_NOT_OK(write_ipc_body(worker->conn_.get(), payload, sequence_num++));
    }

    for (const auto& batch : record_list) {
      ARROW_RETURN_NOT_OK(ipc::GetRecordBatchPayload(*batch, ipc_options, &payload));
      ARROW_RETURN_NOT_OK(write_ipc_body(worker->conn_.get(), payload, sequence_num++));
    }

    return worker->conn_->Flush();
  }