in src/afs.cc [1931:2044]
arrow::Status write_record_batches(const char* tag)
{
SharedRingBufferOutputStream output(this, localSession_);
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
{
auto attribute = TupleDescAttr(SPI_tuptable->tupdesc, i);
ARROW_ASSIGN_OR_RAISE(auto type,
ArrowArrayBuilderBase::arrow_type(attribute));
fields.push_back(arrow::field(
NameStr(attribute->attname), std::move(type), !attribute->attnotnull));
}
auto schema = arrow::schema(fields);
ARROW_ASSIGN_OR_RAISE(
auto builder,
arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
std::vector<std::unique_ptr<ArrowArrayBuilderBase>> builders;
for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
{
auto attribute = TupleDescAttr(SPI_tuptable->tupdesc, i);
ARROW_ASSIGN_OR_RAISE(
auto array_builder,
ArrowArrayBuilderBase::make(attribute, i, builder->GetField(i)));
builders.push_back(std::move(array_builder));
}
auto options = arrow::ipc::IpcWriteOptions::Defaults();
options.emit_dictionary_deltas = true;
// Write schema only stream format data to return only schema.
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeStreamWriter(&output, schema, options));
// Build an empty record batch to write schema.
ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
P("%s: %s: %s: write: schema: WriteRecordBatch", Tag, tag_, tag);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
P("%s: %s: %s: write: schema: Close", Tag, tag_, tag);
ARROW_RETURN_NOT_OK(writer->Close());
// Write another stream format data with record batches.
ARROW_ASSIGN_OR_RAISE(writer,
arrow::ipc::MakeStreamWriter(&output, schema, options));
uint64_t iTuple = 0;
for (; iTuple < SPI_processed; iTuple += MaxNRowsPerRecordBatch)
{
uint64_t iTupleEnd = iTuple + MaxNRowsPerRecordBatch;
if (iTupleEnd >= SPI_processed)
{
iTupleEnd = SPI_processed;
}
P("%s: %s: %s: write: data: record batch: %" PRIu64 "/%" PRIu64,
Tag,
tag_,
tag,
iTuple,
iTupleEnd);
for (int iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
++iAttribute)
{
P("%s: %s: %s: write: data: record batch: %" PRIu64 "/%" PRIu64 ": %d/%d",
Tag,
tag_,
tag,
iTuple,
iTupleEnd,
iAttribute,
SPI_tuptable->tupdesc->natts);
ARROW_RETURN_NOT_OK(builders[iAttribute]->build(iTuple, iTupleEnd));
}
ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
P("%s: %s: %s: write: data: WriteRecordBatch: %" PRIu64 "/%" PRIu64,
Tag,
tag_,
tag,
iTuple,
iTupleEnd);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
}
if (iTuple < SPI_processed)
{
P("%s: %s: %s: write: data: record batch: last: %" PRIu64 "/%" PRIu64,
Tag,
tag_,
tag,
iTuple,
SPI_processed);
for (int iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
++iAttribute)
{
P("%s: %s: %s: write: data: record batch: last: %" PRIu64 "/%" PRIu64
": %d/%d",
Tag,
tag_,
tag,
iTuple,
SPI_processed,
iAttribute,
SPI_tuptable->tupdesc->natts);
ARROW_RETURN_NOT_OK(builders[iAttribute]->build(iTuple, SPI_processed));
}
ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
P("%s: %s: %s: write: data: WriteRecordBatch: last: %" PRIu64 "/%" PRIu64,
Tag,
tag_,
tag,
iTuple,
SPI_processed);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
}
P("%s: %s: %s, write: data: Close", Tag, tag_, tag);
ARROW_RETURN_NOT_OK(writer->Close());
return output.Close();
}