in projects/deliberation_at_scale/packages/orchestrator/src/tasks/updateRoomProgression.ts [639:775]
async function filterProgressionTasks(options: ProgressionTasksContext) {
const { progressionTasks, roomId, helpers, progressionLayerId } = options;
// resolved = should be handled
// rejected = cooldown or something else is not verified
// undefined = should be skipped
const settledProgressionTasks = await Promise.allSettled(progressionTasks.map(async (progressionTask) => {
const { id: progressionTaskId, active = true, cooldown, maxAttempts } = progressionTask;
const { blockProgression = true, minMessageAmount, maxMessageAmount, durationMs: cooldownMs, startDelayMs } = cooldown ?? {};
const jobKey = generateProgressionJobKey(roomId, progressionTaskId);
if (!active) {
return;
}
// skip this task if we have reached the maximum amount of attempts
if (maxAttempts) {
const moderations = await getCompletedModerationsByJobKey(jobKey, maxAttempts);
const moderationAmount = moderations?.length ?? 0;
// guard: check if we have failed moderations
if (moderationAmount >= maxAttempts) {
helpers.logger.info(`The maximum amount of attempts has been reached for job ${jobKey}. Attempts: ${moderationAmount}, max attempts: ${maxAttempts}.`);
return;
}
}
if (startDelayMs) {
const roomProgressionJobKey = getModerationJobKeyForRoomProgression(roomId, progressionLayerId);
const roomProgressionModeration = await getLastCompletedModerationByJobKey(roomProgressionJobKey);
let startDelayReferenceDate = dayjs(roomProgressionModeration?.created_at);
// when there is no moderation yet we will use the room creation date as a reference
// this if for example needed when we're still in the first layer of the progression
if (!roomProgressionModeration) {
const room = await getRoomById(roomId);
startDelayReferenceDate = dayjs(room?.created_at);
}
// check if we need to delay the first time this moderation ever runs
// NOTE: first check this, because no fetching of messages is required
if (startDelayMs) {
const delayRemainder = Math.abs(dayjs().diff(startDelayReferenceDate, 'ms'));
const isDelaying = delayRemainder < startDelayMs;
// guard: check if the delay is valid
if (isDelaying) {
helpers.logger.info(`The start delay is still active for job ${jobKey}. Delay remaining: ${delayRemainder}ms.`);
if (blockProgression) {
const errorMessage = `Progression task ${progressionTaskId} for room ${roomId} is still delaying and should block progress. Delay: ${startDelayMs}ms.`;
helpers.logger.error(errorMessage);
return Promise.reject(errorMessage);
} else {
helpers.logger.info(`Progression task ${progressionTaskId} for room ${roomId} is still delaying, but should NOT block progress. Delay: ${startDelayMs}ms.`);
return;
}
}
}
}
// check for a couple of properties that need the last moderation to be fetched
// TODO: refactor these checks into a single function, because a lot of the logic is the same
// we will do this once we have discovered more of these checks
if (minMessageAmount || maxMessageAmount || cooldownMs) {
const lastModeration = await getLastCompletedModerationByJobKey(jobKey);
const lastModerationDate = dayjs(lastModeration?.created_at);
// check if we need to verify the cooldown before adding the job
// NOTE: first check this, because no fetching of messages is required
if (cooldownMs && lastModeration) {
const cooldownRemainder = lastModerationDate.add(cooldownMs, 'ms').diff(dayjs(), 'ms');
const isCoolingDown = cooldownRemainder > 0;
// guard: check if the cooldown is valid
if (isCoolingDown) {
helpers.logger.info(`The cooldown is still active for job ${jobKey}. Cooldown remaining: ${cooldownRemainder}ms.`);
if (blockProgression) {
const errorMessage = `Progression task ${progressionTaskId} for room ${roomId} is still cooling down and should block progress. Cooldown: ${cooldownRemainder}ms.`;
helpers.logger.error(errorMessage);
return Promise.reject(errorMessage);
} else {
helpers.logger.info(`Progression task ${progressionTaskId} for room ${roomId} is still cooling down, but should NOT block progress. Cooldown: ${cooldownRemainder}ms.`);
return;
}
}
}
// check if we need to verify the amount of messages before adding the job
if ((minMessageAmount || maxMessageAmount) && lastModeration) {
const fromDate = (lastModeration ? lastModerationDate : undefined);
const newMessages = await getMessagesAfter({
roomId,
fromDate,
});
const newMessageAmount = newMessages.length;
// guard: check if the min max is not exceeded
if ((minMessageAmount && newMessageAmount < minMessageAmount) || (maxMessageAmount && newMessageAmount > maxMessageAmount)) {
const minMaxMessage = `Range: min ${minMessageAmount}, max ${maxMessageAmount}, actual: ${newMessageAmount}.`;
helpers.logger.info(`The new amount of messages after ${lastModerationDate} out of range for job ${jobKey}. ${minMaxMessage}`);
if (blockProgression) {
const errorMessage = `Progression task ${progressionTaskId} for room ${roomId} has out or range messages and should block progress. ${minMaxMessage}`;
helpers.logger.error(errorMessage);
return Promise.reject(errorMessage);
} else {
helpers.logger.info(`Progression task ${progressionTaskId} for room ${roomId} has out of range messages, but should NOT block progress. ${minMaxMessage}`);
return;
}
}
}
}
return progressionTask;
}));
const erroredProgressionTasks = settledProgressionTasks.filter((settledProgressionTask) => {
return settledProgressionTask.status === 'rejected';
});
const hasErroredProgressionTasks = erroredProgressionTasks.length > 0;
const fulfilledProgressionTasks = settledProgressionTasks.filter((settledProgressionTask) => {
return settledProgressionTask.status === 'fulfilled';
});
const settledValidProgressionTasks = fulfilledProgressionTasks.filter((fulfilledProgressionTask) => {
return fulfilledProgressionTask.status === 'fulfilled' && fulfilledProgressionTask.value !== undefined;
}) as PromiseFulfilledResult<ProgressionTask>[];
const validProgressionTasks = settledValidProgressionTasks.map((settledValidProgressionTask) => {
return settledValidProgressionTask.value;
});
return {
hasErroredProgressionTasks,
erroredProgressionTasks,
validProgressionTasks,
};
}