arrow::Status write_record_batches()

in src/afs.cc [1931:2044]


	arrow::Status write_record_batches(const char* tag)
	{
		SharedRingBufferOutputStream output(this, localSession_);
		std::vector<std::shared_ptr<arrow::Field>> fields;
		for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
		{
			auto attribute = TupleDescAttr(SPI_tuptable->tupdesc, i);
			ARROW_ASSIGN_OR_RAISE(auto type,
			                      ArrowArrayBuilderBase::arrow_type(attribute));
			fields.push_back(arrow::field(
				NameStr(attribute->attname), std::move(type), !attribute->attnotnull));
		}
		auto schema = arrow::schema(fields);
		ARROW_ASSIGN_OR_RAISE(
			auto builder,
			arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
		std::vector<std::unique_ptr<ArrowArrayBuilderBase>> builders;
		for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
		{
			auto attribute = TupleDescAttr(SPI_tuptable->tupdesc, i);
			ARROW_ASSIGN_OR_RAISE(
				auto array_builder,
				ArrowArrayBuilderBase::make(attribute, i, builder->GetField(i)));
			builders.push_back(std::move(array_builder));
		}

		auto options = arrow::ipc::IpcWriteOptions::Defaults();
		options.emit_dictionary_deltas = true;

		// Write schema only stream format data to return only schema.
		ARROW_ASSIGN_OR_RAISE(auto writer,
		                      arrow::ipc::MakeStreamWriter(&output, schema, options));
		// Build an empty record batch to write schema.
		ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
		P("%s: %s: %s: write: schema: WriteRecordBatch", Tag, tag_, tag);
		ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
		P("%s: %s: %s: write: schema: Close", Tag, tag_, tag);
		ARROW_RETURN_NOT_OK(writer->Close());

		// Write another stream format data with record batches.
		ARROW_ASSIGN_OR_RAISE(writer,
		                      arrow::ipc::MakeStreamWriter(&output, schema, options));
		uint64_t iTuple = 0;
		for (; iTuple < SPI_processed; iTuple += MaxNRowsPerRecordBatch)
		{
			uint64_t iTupleEnd = iTuple + MaxNRowsPerRecordBatch;
			if (iTupleEnd >= SPI_processed)
			{
				iTupleEnd = SPI_processed;
			}
			P("%s: %s: %s: write: data: record batch: %" PRIu64 "/%" PRIu64,
			  Tag,
			  tag_,
			  tag,
			  iTuple,
			  iTupleEnd);
			for (int iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
			     ++iAttribute)
			{
				P("%s: %s: %s: write: data: record batch: %" PRIu64 "/%" PRIu64 ": %d/%d",
				  Tag,
				  tag_,
				  tag,
				  iTuple,
				  iTupleEnd,
				  iAttribute,
				  SPI_tuptable->tupdesc->natts);
				ARROW_RETURN_NOT_OK(builders[iAttribute]->build(iTuple, iTupleEnd));
			}
			ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
			P("%s: %s: %s: write: data: WriteRecordBatch: %" PRIu64 "/%" PRIu64,
			  Tag,
			  tag_,
			  tag,
			  iTuple,
			  iTupleEnd);
			ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
		}

		if (iTuple < SPI_processed)
		{
			P("%s: %s: %s: write: data: record batch: last: %" PRIu64 "/%" PRIu64,
			  Tag,
			  tag_,
			  tag,
			  iTuple,
			  SPI_processed);
			for (int iAttribute = 0; iAttribute < SPI_tuptable->tupdesc->natts;
			     ++iAttribute)
			{
				P("%s: %s: %s: write: data: record batch: last: %" PRIu64 "/%" PRIu64
				  ": %d/%d",
				  Tag,
				  tag_,
				  tag,
				  iTuple,
				  SPI_processed,
				  iAttribute,
				  SPI_tuptable->tupdesc->natts);
				ARROW_RETURN_NOT_OK(builders[iAttribute]->build(iTuple, SPI_processed));
			}
			ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
			P("%s: %s: %s: write: data: WriteRecordBatch: last: %" PRIu64 "/%" PRIu64,
			  Tag,
			  tag_,
			  tag,
			  iTuple,
			  SPI_processed);
			ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
		}
		P("%s: %s: %s, write: data: Close", Tag, tag_, tag);
		ARROW_RETURN_NOT_OK(writer->Close());
		return output.Close();
	}