in projects/deliberation_at_scale/packages/orchestrator/src/tasks/updateRoomProgression.ts [343:421]
async function triggerEnrichments(options: TriggerEnrichmentsOptions) {
const { enrichments, progressionTaskBaseContext, executionTypes, helpers } = options;
const { roomId, progressionLayerId } = progressionTaskBaseContext;
// guard: check if the enrichments are valid
if (!enrichments || isEmpty(enrichments)) {
return;
}
const activeEnrichments = enrichments.filter((enrichment) => {
const { active = true, executionType = 'onNotVerified' } = enrichment;
return active && executionTypes.includes(executionType);
});
const validConditionEnrichmentIds = flat(await Promise.all(activeEnrichments.map(async (enrichment) => {
const { conditions = [], id: enrichmentId } = enrichment;
// guard: check if there are conditions
if (isEmpty(conditions)) {
return Promise.all([Promise.resolve(enrichmentId)]);
}
// handle multiple conditions
return Promise.all(conditions?.map(async (condition) => {
return new Promise<string>((resolve) => {
const { progressionTaskId, isVerified: shouldBeVerified } = condition;
const jobKey = generateProgressionJobKey(roomId, progressionTaskId);
getLastCompletedModerationByJobKey(jobKey).then((lastModeration) => {
const result = lastModeration?.result as unknown as VerificationFunctionCompletionResult;
const isVerified = result?.verified ?? false;
// when the required verification matches the result the promise should resolve to true
if (isVerified === shouldBeVerified) {
return resolve(enrichmentId);
}
resolve('');
});
});
}));
})));
const filteredEnrichments = activeEnrichments.filter((activeEnrichment) => {
const { id: enrichmentId, conditions } = activeEnrichment;
const hasValidConditions = validConditionEnrichmentIds.includes(enrichmentId);
if (hasValidConditions) {
helpers.logger.info(`Enrichment ${enrichmentId} has valid conditions and will be triggered: ${JSON.stringify(conditions)}`);
} else {
helpers.logger.info(`Enrichment ${enrichmentId} has invalid conditions and will be skipped: ${JSON.stringify(conditions)}`);
}
return hasValidConditions;
});
const nonBlockingEnrichments = filteredEnrichments.filter((enrichment) => {
return !enrichment.waitFor;
});
const blockingEnrichments = filteredEnrichments.filter((enrichment) => {
return enrichment.waitFor;
});
helpers.logger.info(`Triggering ${filteredEnrichments.length} enrichments (types: ${JSON.stringify(executionTypes)}) for room ${roomId} in progression layer ${progressionLayerId}.`);
// NOTE: catch errors here, because we want to continue with the other enrichments
// try-catch clause only works with awaiting promises.
handleProgressionTasks({
progressionTasks: nonBlockingEnrichments,
...progressionTaskBaseContext,
}).catch((error) => {
helpers.logger.error(`Could not trigger non blocking enrichments for room ${roomId} in layer ${progressionLayerId}: ${error}`);
});
try {
await handleProgressionTasks({
progressionTasks: blockingEnrichments,
...progressionTaskBaseContext,
});
} catch (error) {
helpers.logger.error(`Could not trigger blocking enrichments for room ${roomId} in layer ${progressionLayerId}: ${error}`);
}
}