arrow::Status write()

in src/afs.cc [956:1057]


	arrow::Status write()
	{
		SharedRingBufferOutputStream output(this);
		std::vector<std::shared_ptr<arrow::Field>> fields;
		for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
		{
			auto attribute = TupleDescAttr(SPI_tuptable->tupdesc, i);
			std::shared_ptr<arrow::DataType> type;
			switch (attribute->atttypid)
			{
				case INT4OID:
					type = arrow::int32();
					break;
				default:
					return arrow::Status::NotImplemented("Unsupported PostgreSQL type: ",
					                                     attribute->atttypid);
			}
			fields.push_back(
				arrow::field(NameStr(attribute->attname), type, !attribute->attnotnull));
		}
		auto schema = arrow::schema(fields);
		ARROW_ASSIGN_OR_RAISE(
			auto builder,
			arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
		auto option = arrow::ipc::IpcWriteOptions::Defaults();
		option.emit_dictionary_deltas = true;

		// Write schema only stream format data to return only schema.
		ARROW_ASSIGN_OR_RAISE(auto writer,
		                      arrow::ipc::MakeStreamWriter(&output, schema, option));
		// Build an empty record batch to write schema.
		ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
		P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
		ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
		P("%s: %s: write: schema: Close", Tag, tag_);
		ARROW_RETURN_NOT_OK(writer->Close());

		// Write another stream format data with record batches.
		ARROW_ASSIGN_OR_RAISE(writer,
		                      arrow::ipc::MakeStreamWriter(&output, schema, option));
		bool needLastFlush = false;
		for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
		{
			P("%s: %s: write: data: record batch: %d/%d",
			  Tag,
			  tag_,
			  iTuple,
			  SPI_processed);
			for (uint64_t iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
			     ++iAttribute)
			{
				P("%s: %s: write: data: record batch: %d/%d: %d/%d",
				  Tag,
				  tag_,
				  iTuple,
				  SPI_processed,
				  iAttribute,
				  SPI_tuptable->tupdesc->natts);
				bool isNull;
				auto datum = SPI_getbinval(SPI_tuptable->vals[iTuple],
				                           SPI_tuptable->tupdesc,
				                           iAttribute + 1,
				                           &isNull);
				if (isNull)
				{
					auto arrayBuilder = builder->GetField(iAttribute);
					ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
				}
				else
				{
					auto arrayBuilder =
						builder->GetFieldAs<arrow::Int32Builder>(iAttribute);
					ARROW_RETURN_NOT_OK(arrayBuilder->Append(DatumGetInt32(datum)));
				}
			}

			if (((iTuple + 1) % MaxNRowsPerRecordBatch) == 0)
			{
				ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
				P("%s: %s: write: data: WriteRecordBatch: %d/%d",
				  Tag,
				  tag_,
				  iTuple,
				  SPI_processed);
				ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
				needLastFlush = false;
			}
			else
			{
				needLastFlush = true;
			}
		}
		if (needLastFlush)
		{
			ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
			P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
			ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
		}
		P("%s: %s: write: data: Close", Tag, tag_);
		ARROW_RETURN_NOT_OK(writer->Close());
		return output.Close();
	}