async function filterProgressionTasks()

in projects/deliberation_at_scale/packages/orchestrator/src/tasks/updateRoomProgression.ts [639:775]


async function filterProgressionTasks(options: ProgressionTasksContext) {
    const { progressionTasks, roomId, helpers, progressionLayerId } = options;

    // resolved = should be handled
    // rejected = cooldown or something else is not verified
    // undefined = should be skipped
    const settledProgressionTasks = await Promise.allSettled(progressionTasks.map(async (progressionTask) => {
        const { id: progressionTaskId, active = true, cooldown, maxAttempts } = progressionTask;
        const { blockProgression = true, minMessageAmount, maxMessageAmount, durationMs: cooldownMs, startDelayMs } = cooldown ?? {};
        const jobKey = generateProgressionJobKey(roomId, progressionTaskId);

        if (!active) {
            return;
        }

        // skip this task if we have reached the maximum amount of attempts
        if (maxAttempts) {
            const moderations = await getCompletedModerationsByJobKey(jobKey, maxAttempts);
            const moderationAmount = moderations?.length ?? 0;

            // guard: check if we have failed moderations
            if (moderationAmount >= maxAttempts) {
                helpers.logger.info(`The maximum amount of attempts has been reached for job ${jobKey}. Attempts: ${moderationAmount}, max attempts: ${maxAttempts}.`);
                return;
            }
        }

        if (startDelayMs) {
            const roomProgressionJobKey = getModerationJobKeyForRoomProgression(roomId, progressionLayerId);
            const roomProgressionModeration = await getLastCompletedModerationByJobKey(roomProgressionJobKey);
            let startDelayReferenceDate = dayjs(roomProgressionModeration?.created_at);

            // when there is no moderation yet we will use the room creation date as a reference
            // this if for example needed when we're still in the first layer of the progression
            if (!roomProgressionModeration) {
                const room = await getRoomById(roomId);
                startDelayReferenceDate = dayjs(room?.created_at);
            }

            // check if we need to delay the first time this moderation ever runs
            // NOTE: first check this, because no fetching of messages is required
            if (startDelayMs) {
                const delayRemainder = Math.abs(dayjs().diff(startDelayReferenceDate, 'ms'));
                const isDelaying = delayRemainder < startDelayMs;

                // guard: check if the delay is valid
                if (isDelaying) {
                    helpers.logger.info(`The start delay is still active for job ${jobKey}. Delay remaining: ${delayRemainder}ms.`);

                    if (blockProgression) {
                        const errorMessage = `Progression task ${progressionTaskId} for room ${roomId} is still delaying and should block progress. Delay: ${startDelayMs}ms.`;
                        helpers.logger.error(errorMessage);
                        return Promise.reject(errorMessage);
                    } else {
                        helpers.logger.info(`Progression task ${progressionTaskId} for room ${roomId} is still delaying, but should NOT block progress. Delay: ${startDelayMs}ms.`);
                        return;
                    }
                }
            }
        }

        // check for a couple of properties that need the last moderation to be fetched
        // TODO: refactor these checks into a single function, because a lot of the logic is the same
        // we will do this once we have discovered more of these checks
        if (minMessageAmount || maxMessageAmount || cooldownMs) {
            const lastModeration = await getLastCompletedModerationByJobKey(jobKey);
            const lastModerationDate = dayjs(lastModeration?.created_at);

            // check if we need to verify the cooldown before adding the job
            // NOTE: first check this, because no fetching of messages is required
            if (cooldownMs && lastModeration) {
                const cooldownRemainder = lastModerationDate.add(cooldownMs, 'ms').diff(dayjs(), 'ms');
                const isCoolingDown = cooldownRemainder > 0;

                // guard: check if the cooldown is valid
                if (isCoolingDown) {
                    helpers.logger.info(`The cooldown is still active for job ${jobKey}. Cooldown remaining: ${cooldownRemainder}ms.`);

                    if (blockProgression) {
                        const errorMessage = `Progression task ${progressionTaskId} for room ${roomId} is still cooling down and should block progress. Cooldown: ${cooldownRemainder}ms.`;
                        helpers.logger.error(errorMessage);
                        return Promise.reject(errorMessage);
                    } else {
                        helpers.logger.info(`Progression task ${progressionTaskId} for room ${roomId} is still cooling down, but should NOT block progress. Cooldown: ${cooldownRemainder}ms.`);
                        return;
                    }
                }
            }

            // check if we need to verify the amount of messages before adding the job
            if ((minMessageAmount || maxMessageAmount) && lastModeration) {
                const fromDate = (lastModeration ? lastModerationDate : undefined);
                const newMessages = await getMessagesAfter({
                    roomId,
                    fromDate,
                });
                const newMessageAmount = newMessages.length;

                // guard: check if the min max is not exceeded
                if ((minMessageAmount && newMessageAmount < minMessageAmount) || (maxMessageAmount && newMessageAmount > maxMessageAmount)) {
                    const minMaxMessage = `Range: min ${minMessageAmount}, max ${maxMessageAmount}, actual: ${newMessageAmount}.`;
                    helpers.logger.info(`The new amount of messages after ${lastModerationDate} out of range for job ${jobKey}. ${minMaxMessage}`);

                    if (blockProgression) {
                        const errorMessage = `Progression task ${progressionTaskId} for room ${roomId} has out or range messages and should block progress. ${minMaxMessage}`;
                        helpers.logger.error(errorMessage);
                        return Promise.reject(errorMessage);
                    } else {
                        helpers.logger.info(`Progression task ${progressionTaskId} for room ${roomId} has out of range messages, but should NOT block progress. ${minMaxMessage}`);
                        return;
                    }
                }
            }
        }

        return progressionTask;
    }));
    const erroredProgressionTasks = settledProgressionTasks.filter((settledProgressionTask) => {
        return settledProgressionTask.status === 'rejected';
    });
    const hasErroredProgressionTasks = erroredProgressionTasks.length > 0;
    const fulfilledProgressionTasks = settledProgressionTasks.filter((settledProgressionTask) => {
        return settledProgressionTask.status === 'fulfilled';
    });
    const settledValidProgressionTasks = fulfilledProgressionTasks.filter((fulfilledProgressionTask) => {
        return fulfilledProgressionTask.status === 'fulfilled' && fulfilledProgressionTask.value !== undefined;
    }) as PromiseFulfilledResult<ProgressionTask>[];
    const validProgressionTasks = settledValidProgressionTasks.map((settledValidProgressionTask) => {
        return settledValidProgressionTask.value;
    });

    return {
        hasErroredProgressionTasks,
        erroredProgressionTasks,
        validProgressionTasks,
    };
}