in dissociated-ipc/cudf-flight-server.cc [138:182]
arrow::Status send_metadata_stream(UcxServer::ClientWorker* worker,
const std::string& ident) {
auto it = data_sets_.find(ident);
if (it == data_sets_.end()) {
return arrow::Status::Invalid("data set not found:", ident);
}
ipc::IpcWriteOptions ipc_options;
ipc::DictionaryFieldMapper mapper;
const auto& record_list = it->second;
auto schema = record_list[0]->schema();
ARROW_RETURN_NOT_OK(mapper.AddSchemaFields(*schema));
// for each record in the stream, collect the IPC metadata to send
uint32_t sequence_num = 0;
// schema payload is first
ipc::IpcPayload payload;
ARROW_RETURN_NOT_OK(ipc::GetSchemaPayload(*schema, ipc_options, mapper, &payload));
ARROW_RETURN_NOT_OK(write_ipc_metadata(worker->conn_.get(), payload, sequence_num++));
// then any dictionaries
ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
ipc::CollectDictionaries(*record_list[0], mapper));
for (const auto& pair : dictionaries) {
ARROW_RETURN_NOT_OK(
ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options, &payload));
ARROW_RETURN_NOT_OK(
write_ipc_metadata(worker->conn_.get(), payload, sequence_num++));
}
// finally the record batch metadata messages
for (const auto& batch : record_list) {
ARROW_RETURN_NOT_OK(ipc::GetRecordBatchPayload(*batch, ipc_options, &payload));
ARROW_RETURN_NOT_OK(
write_ipc_metadata(worker->conn_.get(), payload, sequence_num++));
}
// finally, we send the End-Of-Stream message
std::array<uint8_t, 5> eos_bytes{static_cast<uint8_t>(MetadataMsgType::EOS), 0, 0, 0,
0};
utils::Uint32ToBytesLE(sequence_num, eos_bytes.data() + 1);
ARROW_RETURN_NOT_OK(worker->conn_->Flush());
return worker->conn_->SendAM(0, eos_bytes.data(), eos_bytes.size());
}