in dissociated-ipc/cudf-flight-server.cc [227:258]
arrow::Status send_data_stream(UcxServer::ClientWorker* worker,
const std::string& ident) {
auto it = data_sets_.find(ident);
if (it == data_sets_.end()) {
return arrow::Status::Invalid("data set not found:", ident);
}
ipc::IpcWriteOptions ipc_options;
ipc::DictionaryFieldMapper mapper;
const auto& record_list = it->second;
auto schema = record_list[0]->schema();
ARROW_RETURN_NOT_OK(mapper.AddSchemaFields(*schema));
// start at 1 since schema payload has no body
uint32_t sequence_num = 1;
ipc::IpcPayload payload;
ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
ipc::CollectDictionaries(*record_list[0], mapper));
for (const auto& pair : dictionaries) {
ARROW_RETURN_NOT_OK(
ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options, &payload));
ARROW_RETURN_NOT_OK(write_ipc_body(worker->conn_.get(), payload, sequence_num++));
}
for (const auto& batch : record_list) {
ARROW_RETURN_NOT_OK(ipc::GetRecordBatchPayload(*batch, ipc_options, &payload));
ARROW_RETURN_NOT_OK(write_ipc_body(worker->conn_.get(), payload, sequence_num++));
}
return worker->conn_->Flush();
}