in src/afs.cc [2915:2974]
void process_prepare_requests()
{
#ifdef AFS_VERBOSE
const char* tag = "prepare";
#endif
std::lock_guard<std::mutex> lock(mutex_);
for (auto it = prepareRequests_.begin(); it != prepareRequests_.end();)
{
auto& request = *it;
auto session = static_cast<SharedSessionData*>(
dshash_find(sessions_, &(request->localSession->id), false));
if (!session)
{
request->finished = true;
request->localSession->errorMessage =
std::string("stolen session: ") +
std::to_string(request->localSession->id);
it = prepareRequests_.erase(it);
continue;
}
if (request->processing)
{
SharedSessionReleaser sessionReleaser(sessions_, session);
if (DsaPointerIsValid(session->errorMessage))
{
request->processing = false;
request->finished = true;
it = prepareRequests_.erase(it);
}
else if (DsaPointerIsValid(session->preparedStatementHandle))
{
request->processing = false;
request->finished = true;
request->handle = static_cast<const char*>(
dsa_get_address(area_, session->preparedStatementHandle));
it = prepareRequests_.erase(it);
}
else
{
++it;
}
}
else
{
auto executorPID = session->executorPID;
{
SharedSessionReleaser sessionReleaser(sessions_, session);
ProcessorLockGuard lock(this);
set_shared_string(session->prepareQuery, request->query);
session->action = Action::Prepare;
session->nUpdatedRecords = -1;
set_shared_string(session->preparedStatementHandle, std::string(""));
}
P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, executorPID);
kill(executorPID, SIGUSR1);
request->processing = true;
++it;
}
}
}