in projects/deliberation_at_scale/packages/orchestrator/src/utilities/tasks.ts [143:248]
export function createModeratorTask<PayloadType extends BaseRoomWorkerTaskPayload, ResultType>(options: CreateModeratorTaskOptions<PayloadType, ResultType>) {
return async (payload: PayloadType, helpers: Helpers): Promise<ModeratorTaskTuple> => {
const {
getTaskInstruction,
getTaskContent,
getShouldSendBotMessage = () => false,
getBotMessageContent = () => '',
performTask,
onTaskCompleted,
getShouldStoreOutcome = () => false,
getOutcomeSourcesMessageIds,
getOutcomeContent,
getOutcomeType,
} = options;
const { jobKey, roomId } = payload;
const baseTaskHelpers: BaseTaskHelpers<PayloadType> = {
helpers,
payload,
};
// guard: skip task when there is no task instruction
if (!jobKey || !roomId) {
helpers.logger.error(`No valid job key or room ID is found for the moderator task with payload: ${payload}.`);
return {};
}
// get these in parallel to each other to optimize speed
const [taskInstructionResult, taskContentResult] = await Promise.allSettled([
getTaskInstruction(baseTaskHelpers),
getTaskContent(baseTaskHelpers),
]);
// guard: retry when there is no instruction or content
if (taskInstructionResult.status === 'rejected' || taskContentResult.status === 'rejected') {
throw Error(`Could not get the task instruction or content for the moderator task with key ${jobKey}.`);
}
// perform the actual verification prompt on the payload
const taskInstruction = taskInstructionResult?.value;
const taskContent = taskContentResult?.value?.trim();
// guard: skip task when there is no task instruction or content
if (isEmpty(taskInstruction) || isEmpty(taskContent)) {
helpers.logger.error(`The task instruction or content is invalid for the moderator task with key ${jobKey}`);
return {};
}
const performTaskHelpers: PerformTaskHelpers<PayloadType> = {
...baseTaskHelpers,
taskInstruction,
taskContent,
};
const taskResult = await performTask(performTaskHelpers);
const resultTaskHelpers: ResultTaskHelpers<PayloadType, ResultType> = {
...performTaskHelpers,
result: taskResult,
};
const shouldSendBotMessage = await getShouldSendBotMessage(resultTaskHelpers);
const botMessageContent = await getBotMessageContent(resultTaskHelpers);
const shouldStoreOutcome = await getShouldStoreOutcome(resultTaskHelpers);
// guard: throw error when we should send a bot message but its invalid
if (shouldSendBotMessage && isEmpty(botMessageContent)) {
throw Error(`The bot message content is invalid for the moderator task with key ${jobKey}, result: ${JSON.stringify(taskResult)}`);
}
helpers.logger.info(`The moderator task with key ${jobKey} has been completed with the following result:`);
helpers.logger.info(JSON.stringify(taskResult, null, 2));
// execute these in parallel to each other
const results = await Promise.allSettled([
// always store the result of this verification in the database for logging purposes
storeModerationResult({
jobKey,
result: (taskResult as unknown as Json),
}),
// store the outcome only when requested
shouldStoreOutcome && storeOutcome({
helpers: resultTaskHelpers,
getOutcomeSourcesMessageIds,
getOutcomeContent,
getOutcomeType,
}),
// execute the the callback when it is defined
!!onTaskCompleted && onTaskCompleted(resultTaskHelpers),
// send a message to the room only when there is no safe language
shouldSendBotMessage && sendBotMessage({
content: PRINT_JOBKEY ? `${jobKey}: ${botMessageContent}` : botMessageContent,
roomId,
}),
]);
const moderationResult = results?.[0];
const outcomeResult = results?.[1];
const moderation = moderationResult.status === 'fulfilled' ? moderationResult?.value : undefined;
const outcome = outcomeResult.status === 'fulfilled' && outcomeResult?.value !== false ? outcomeResult?.value : undefined;
return {
moderation,
outcome,
};
};
}