arrow::Status set_parameters()

in src/afs.cc [3212:3269]


	arrow::Status set_parameters(uint64_t sessionID,
	                             const std::string& handle,
	                             arrow::flight::FlightMessageReader* reader,
	                             arrow::flight::FlightMetadataWriter* writer)
	{
#ifdef AFS_VERBOSE
		const char* tag = "set parameters";
#endif
		ARROW_ASSIGN_OR_RAISE(auto localSession, find_session(sessionID));
		auto request = std::make_shared<SetParametersRequest>(localSession, handle);
		{
			std::lock_guard<std::mutex> lock(mutex_);
			setParametersRequests_.push_back(request);
		}
		kill(MyProcPid, SIGUSR1);
		auto executorPID = localSession->peerPID;
		{
			ARROW_ASSIGN_OR_RAISE(const auto& schema, reader->GetSchema());
			SharedRingBufferOutputStream output(this, localSession);
			auto options = arrow::ipc::IpcWriteOptions::Defaults();
			options.emit_dictionary_deltas = true;
			ARROW_ASSIGN_OR_RAISE(auto writer,
			                      arrow::ipc::MakeStreamWriter(&output, schema, options));
			while (true)
			{
				ARROW_ASSIGN_OR_RAISE(const auto& chunk, reader->Next());
				if (!chunk.data)
				{
					break;
				}
				ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
			}
			ARROW_RETURN_NOT_OK(writer->Close());
			P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, executorPID);
			kill(executorPID, SIGUSR1);
		}
		{
			std::unique_lock<std::mutex> lock(mutex_);
			conditionVariable_.wait(lock, [&] {
				if (localSession->errorMessage.has_value())
				{
					return true;
				}
				if (INTERRUPTS_PENDING_CONDITION())
				{
					return true;
				}
				return request->finished;
			});
		}
		ARROW_RETURN_NOT_OK(check_local_session_error(localSession));
		if (INTERRUPTS_PENDING_CONDITION())
		{
			return arrow::Status::Invalid("interrupted");
		}
		P("%s: %s: %s: done", Tag, tag_, tag);
		return arrow::Status::OK();
	}