in projects/deliberation_at_scale/packages/orchestrator/src/tasks/triggerRoomProgressionUpdates.ts [19:98]
export default async function triggerRoomProgressionUpdates(payload: TriggerRoomProgressionUpdatesPayload, helpers: Helpers) {
try {
const minRoomCreatedAt = dayjs().subtract(1, "hour").toISOString();
const minParticipantLastSeenAt = dayjs().subtract(60, "second").toISOString();
const [roomsResult, activeParticipantsResult] = await Promise.allSettled([
supabaseClient
.from('rooms')
.select()
.eq('active', true)
.not('starts_at', 'is', null)
.gt('created_at', minRoomCreatedAt),
supabaseClient
.from('participants')
.select()
.eq('active', true)
.gt('last_seen_at', minParticipantLastSeenAt),
]);
// guard: check for rejections
if (roomsResult.status === "rejected" || activeParticipantsResult.status === "rejected") {
throw new Error(`Error while fetching active rooms or locked jobs.`);
}
const rooms = roomsResult.value?.data ?? [];
const activeParticipants = activeParticipantsResult.value?.data ?? [];
const activeParticipantsAmount = activeParticipants.length;
const activeParticipantsRoomIds = activeParticipants.map((participant) => participant.room_id);
const activeRooms = rooms.filter((room) => activeParticipantsRoomIds.includes(room.id));
const activeRoomsAmount = activeRooms.length;
helpers.logger.info(`Scheduling progression updates for all ${activeRoomsAmount} active rooms with ${activeParticipantsAmount} active participants...`);
await Promise.allSettled(activeRooms.map(async (activeRoom) => {
const { id: roomId } = activeRoom;
const jobKey = `updateRoomProgression-${roomId}`;
const newJobPayload: UpdateRoomProgressionPayload = {
roomId,
jobKey,
};
const { rows: [existingJob] } = await getRunnerUtils().withPgClient((pgClient) => {
return pgClient.query("SELECT * FROM graphile_worker.jobs WHERE key = $1", [jobKey]);
});
// guard: we need to check whether the existing job is currently being executed
// and if so we will skip this room, because another worker is already working on it
if (existingJob &&
existingJob.attempts < existingJob.max_attempts &&
Math.abs(dayjs().diff(dayjs(existingJob.run_at), 'ms')) < ONE_SECOND_MS * 60 * 2
) {
helpers.logger.info(`Skipping room ${roomId} because it is already being worked on...`);
return;
}
// guard: skip when in testing mode and room is not in allowlist
if (ENABLE_SINGLE_ROOM_TESTING && !TEST_ROOM_ID_ALLOWLIST.includes(roomId)) {
return;
}
helpers.logger.info(`Scheduling progression update for room ${roomId}...`);
return await helpers.addJob("updateRoomProgression", newJobPayload, {
jobKey,
});
}));
} catch (error) {
captureEvent({
message: `Error while scheduling progression updates: ${JSON.stringify(error)}`,
level: 'error',
helpers,
});
}
await reschedule<TriggerRoomProgressionUpdatesPayload>({
workerTaskId: "triggerRoomProgressionUpdates",
jobKey: "triggerRoomProgressionUpdates",
intervalMs: UPDATE_ROOM_PROGRESSION_INTERVAL_MS,
payload: {},
helpers,
});
}