in src/afs.cc [3441:3499]
arrow::Result<int64_t> update_prepared_statement(
uint64_t sessionID,
const std::string& handle,
arrow::flight::FlightMessageReader* reader)
{
#ifdef AFS_VERBOSE
const char* tag = "update prepared statement";
#endif
ARROW_ASSIGN_OR_RAISE(auto localSession, find_session(sessionID));
auto request =
std::make_shared<UpdatePreparedStatementRequest>(localSession, handle);
{
std::lock_guard<std::mutex> lock(mutex_);
updatePreparedStatementRequests_.push_back(request);
}
kill(MyProcPid, SIGUSR1);
auto executorPID = localSession->peerPID;
{
ARROW_ASSIGN_OR_RAISE(const auto& schema, reader->GetSchema());
SharedRingBufferOutputStream output(this, localSession);
auto options = arrow::ipc::IpcWriteOptions::Defaults();
options.emit_dictionary_deltas = true;
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeStreamWriter(&output, schema, options));
while (true)
{
ARROW_ASSIGN_OR_RAISE(const auto& chunk, reader->Next());
if (!chunk.data)
{
break;
}
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
}
ARROW_RETURN_NOT_OK(writer->Close());
P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, executorPID);
kill(executorPID, SIGUSR1);
}
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
if (localSession->errorMessage.has_value())
{
return true;
}
if (INTERRUPTS_PENDING_CONDITION())
{
return true;
}
return request->finished;
});
}
ARROW_RETURN_NOT_OK(check_local_session_error(localSession));
if (INTERRUPTS_PENDING_CONDITION())
{
return arrow::Status::Invalid("interrupted");
}
P("%s: %s: %s: done: %" PRIu64, Tag, tag_, tag, request->nUpdatedRecords);
return request->nUpdatedRecords;
}