in src/afs.cc [3861:3937]
void update_sessions()
{
const char* tag = "update sessions";
P("%s: %s: %s: start", Tag, tag_, tag);
dshash_seq_status sessionsStatus;
dshash_seq_init(&sessionsStatus, sessions_, false);
while (true)
{
auto session =
static_cast<SharedSessionData*>(dshash_seq_next(&sessionsStatus));
if (!session)
{
break;
}
if (session->started)
{
if (!session->finished)
{
try
{
auto handle = executorHandles_.at(session->id);
pid_t pid;
if (GetBackgroundWorkerPid(handle, &pid) == BGWH_STOPPED)
{
// A finished session
session->finished = true;
executorHandles_.erase(session->id);
}
} catch (const std::exception& exception)
{
session->finished = true;
}
}
continue;
}
// A new session request
BackgroundWorker worker = {};
snprintf(
worker.bgw_name, BGW_MAXLEN, "%s: executor: %" PRIu64, Tag, session->id);
snprintf(
worker.bgw_type, BGW_MAXLEN, "%s: executor: %" PRIu64, Tag, session->id);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
snprintf(worker.bgw_library_name, BGW_MAXLEN, "%s", LibraryName);
snprintf(worker.bgw_function_name, BGW_MAXLEN, "afs_executor");
worker.bgw_main_arg = Int64GetDatum(session->id);
worker.bgw_notify_pid = MyProcPid;
BackgroundWorkerHandle* handle;
P("%s: %s: %s: register executor: %" PRIu64, Tag, tag_, tag, session->id);
session->started = true;
if (RegisterDynamicBackgroundWorker(&worker, &handle))
{
executorHandles_[session->id] = handle;
WaitForBackgroundWorkerStartup(handle, &(session->executorPID));
P("%s: %s: %s: started executor: %" PRIu64, Tag, tag_, tag, session->id);
}
else
{
P("%s: %s: %s: failed to start executor: %" PRIu64,
Tag,
tag_,
tag,
session->id);
set_shared_string(
session->errorMessage,
std::string(Tag) + ": " + tag_ + ": " + tag +
": failed to start executor: " + std::to_string(session->id));
}
}
dshash_seq_term(&sessionsStatus);
P("%s: %s: %s: end", Tag, tag_, tag);
kill(sharedData_->serverPID, SIGUSR1);
}