async function triggerEnrichments()

in projects/deliberation_at_scale/packages/orchestrator/src/tasks/updateRoomProgression.ts [343:421]


async function triggerEnrichments(options: TriggerEnrichmentsOptions) {
    const { enrichments, progressionTaskBaseContext, executionTypes, helpers } = options;
    const { roomId, progressionLayerId } = progressionTaskBaseContext;

    // guard: check if the enrichments are valid
    if (!enrichments || isEmpty(enrichments)) {
        return;
    }

    const activeEnrichments = enrichments.filter((enrichment) => {
        const { active = true, executionType = 'onNotVerified' } = enrichment;
        return active && executionTypes.includes(executionType);
    });
    const validConditionEnrichmentIds = flat(await Promise.all(activeEnrichments.map(async (enrichment) => {
        const { conditions = [], id: enrichmentId } = enrichment;

        // guard: check if there are conditions
        if (isEmpty(conditions)) {
            return Promise.all([Promise.resolve(enrichmentId)]);
        }

        // handle multiple conditions
        return Promise.all(conditions?.map(async (condition) => {
            return new Promise<string>((resolve) => {
                const { progressionTaskId, isVerified: shouldBeVerified } = condition;
                const jobKey = generateProgressionJobKey(roomId, progressionTaskId);

                getLastCompletedModerationByJobKey(jobKey).then((lastModeration) => {
                    const result = lastModeration?.result as unknown as VerificationFunctionCompletionResult;
                    const isVerified = result?.verified ?? false;

                    // when the required verification matches the result the promise should resolve to true
                    if (isVerified === shouldBeVerified) {
                        return resolve(enrichmentId);
                    }

                    resolve('');
                });
            });
        }));
    })));
    const filteredEnrichments = activeEnrichments.filter((activeEnrichment) => {
        const { id: enrichmentId, conditions } = activeEnrichment;
        const hasValidConditions = validConditionEnrichmentIds.includes(enrichmentId);

        if (hasValidConditions) {
            helpers.logger.info(`Enrichment ${enrichmentId} has valid conditions and will be triggered: ${JSON.stringify(conditions)}`);
        } else {
            helpers.logger.info(`Enrichment ${enrichmentId} has invalid conditions and will be skipped: ${JSON.stringify(conditions)}`);
        }
        return hasValidConditions;
    });
    const nonBlockingEnrichments = filteredEnrichments.filter((enrichment) => {
        return !enrichment.waitFor;
    });
    const blockingEnrichments = filteredEnrichments.filter((enrichment) => {
        return enrichment.waitFor;
    });

    helpers.logger.info(`Triggering ${filteredEnrichments.length} enrichments (types: ${JSON.stringify(executionTypes)}) for room ${roomId} in progression layer ${progressionLayerId}.`);

    // NOTE: catch errors here, because we want to continue with the other enrichments
    // try-catch clause only works with awaiting promises.
    handleProgressionTasks({
        progressionTasks: nonBlockingEnrichments,
        ...progressionTaskBaseContext,
    }).catch((error) => {
        helpers.logger.error(`Could not trigger non blocking enrichments for room ${roomId} in layer ${progressionLayerId}: ${error}`);
    });

    try {
        await handleProgressionTasks({
            progressionTasks: blockingEnrichments,
            ...progressionTaskBaseContext,
        });
    } catch (error) {
        helpers.logger.error(`Could not trigger blocking enrichments for room ${roomId} in layer ${progressionLayerId}: ${error}`);
    }
}