in src/afs.cc [956:1057]
arrow::Status write()
{
SharedRingBufferOutputStream output(this);
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
{
auto attribute = TupleDescAttr(SPI_tuptable->tupdesc, i);
std::shared_ptr<arrow::DataType> type;
switch (attribute->atttypid)
{
case INT4OID:
type = arrow::int32();
break;
default:
return arrow::Status::NotImplemented("Unsupported PostgreSQL type: ",
attribute->atttypid);
}
fields.push_back(
arrow::field(NameStr(attribute->attname), type, !attribute->attnotnull));
}
auto schema = arrow::schema(fields);
ARROW_ASSIGN_OR_RAISE(
auto builder,
arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
auto option = arrow::ipc::IpcWriteOptions::Defaults();
option.emit_dictionary_deltas = true;
// Write schema only stream format data to return only schema.
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeStreamWriter(&output, schema, option));
// Build an empty record batch to write schema.
ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
P("%s: %s: write: schema: Close", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->Close());
// Write another stream format data with record batches.
ARROW_ASSIGN_OR_RAISE(writer,
arrow::ipc::MakeStreamWriter(&output, schema, option));
bool needLastFlush = false;
for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
{
P("%s: %s: write: data: record batch: %d/%d",
Tag,
tag_,
iTuple,
SPI_processed);
for (uint64_t iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
++iAttribute)
{
P("%s: %s: write: data: record batch: %d/%d: %d/%d",
Tag,
tag_,
iTuple,
SPI_processed,
iAttribute,
SPI_tuptable->tupdesc->natts);
bool isNull;
auto datum = SPI_getbinval(SPI_tuptable->vals[iTuple],
SPI_tuptable->tupdesc,
iAttribute + 1,
&isNull);
if (isNull)
{
auto arrayBuilder = builder->GetField(iAttribute);
ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
}
else
{
auto arrayBuilder =
builder->GetFieldAs<arrow::Int32Builder>(iAttribute);
ARROW_RETURN_NOT_OK(arrayBuilder->Append(DatumGetInt32(datum)));
}
}
if (((iTuple + 1) % MaxNRowsPerRecordBatch) == 0)
{
ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
P("%s: %s: write: data: WriteRecordBatch: %d/%d",
Tag,
tag_,
iTuple,
SPI_processed);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
needLastFlush = false;
}
else
{
needLastFlush = true;
}
}
if (needLastFlush)
{
ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
}
P("%s: %s: write: data: Close", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->Close());
return output.Close();
}