in src/afs.cc [1359:1384]
arrow::Result<int64_t> update(std::shared_ptr<SharedRingBufferInputStream>& input)
{
ARROW_ASSIGN_OR_RAISE(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(input));
SPIExecuteOptions options = {};
std::vector<Oid> pgTypes;
ARROW_RETURN_NOT_OK(prepare(options, pgTypes, reader->schema()));
auto plan = SPI_prepare(query_.c_str(), pgTypes.size(), pgTypes.data());
ScopedPlan scopedPlan(plan);
int64_t nUpdatedRecords = 0;
while (true)
{
std::shared_ptr<arrow::RecordBatch> recordBatch;
ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
if (!recordBatch)
{
break;
}
ARROW_RETURN_NOT_OK(execute(plan, recordBatch, options, [&nUpdatedRecords]() {
nUpdatedRecords += SPI_processed;
return arrow::Status::OK();
}));
}
return nUpdatedRecords;
}