in src/afs.cc [3212:3269]
arrow::Status set_parameters(uint64_t sessionID,
const std::string& handle,
arrow::flight::FlightMessageReader* reader,
arrow::flight::FlightMetadataWriter* writer)
{
#ifdef AFS_VERBOSE
const char* tag = "set parameters";
#endif
ARROW_ASSIGN_OR_RAISE(auto localSession, find_session(sessionID));
auto request = std::make_shared<SetParametersRequest>(localSession, handle);
{
std::lock_guard<std::mutex> lock(mutex_);
setParametersRequests_.push_back(request);
}
kill(MyProcPid, SIGUSR1);
auto executorPID = localSession->peerPID;
{
ARROW_ASSIGN_OR_RAISE(const auto& schema, reader->GetSchema());
SharedRingBufferOutputStream output(this, localSession);
auto options = arrow::ipc::IpcWriteOptions::Defaults();
options.emit_dictionary_deltas = true;
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeStreamWriter(&output, schema, options));
while (true)
{
ARROW_ASSIGN_OR_RAISE(const auto& chunk, reader->Next());
if (!chunk.data)
{
break;
}
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
}
ARROW_RETURN_NOT_OK(writer->Close());
P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, executorPID);
kill(executorPID, SIGUSR1);
}
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
if (localSession->errorMessage.has_value())
{
return true;
}
if (INTERRUPTS_PENDING_CONDITION())
{
return true;
}
return request->finished;
});
}
ARROW_RETURN_NOT_OK(check_local_session_error(localSession));
if (INTERRUPTS_PENDING_CONDITION())
{
return arrow::Status::Invalid("interrupted");
}
P("%s: %s: %s: done", Tag, tag_, tag);
return arrow::Status::OK();
}