in projects/deliberation_at_scale/packages/orchestrator/src/tasks/enrichVoteCheck.ts [71:439]
export default async function enrichVoteCheck(payload: BaseProgressionWorkerTaskPayload, helpers: Helpers) {
const { roomId } = payload;
const [roomResult, latestOutcomeResult, outcomesResult, participantsResult, roomBotMessagesResult] = await Promise.allSettled([
getRoomById(roomId),
getLatestOutcomeByRoomId(roomId),
getOutcomesByRoomId(roomId),
getParticipantsByRoomId(roomId),
getMessagesAfter({
roomId,
limit: DEFAULT_MESSAGE_LIMIT,
types: ['bot'],
}),
]);
// guard: check if the required data is valid
if (roomResult.status === 'rejected' ||
latestOutcomeResult.status === 'rejected' ||
outcomesResult.status === 'rejected' ||
participantsResult.status === 'rejected' ||
roomBotMessagesResult.status === 'rejected') {
throw Error(`Could not get the latest outcome, participants, or messages for the vote check enrichment.`);
}
const room = roomResult?.value;
const topicId = room?.topic_id;
const latestOutcome = latestOutcomeResult?.value;
const outcomes = outcomesResult?.value;
const participants = participantsResult?.value;
const roomBotMessages = roomBotMessagesResult?.value;
// guard: send an initial outcome when there is none yet
if (!latestOutcome) {
helpers.logger.info(`Sending initial outcome for room ${roomId}...`);
await sendNewCrossPollination({
roomId,
topicId,
helpers,
attemptSendBotMessage: (options) => {
sendBotMessage(options);
},
});
return;
}
// fetching more data when outcome is found
const latestOutcomeId = latestOutcome.id;
const latestOutcomeContent = latestOutcome.content;
const participantIds = participants?.map(participant => participant.id);
const roomOutcomeAmount = outcomes?.length ?? 0;
const latestBotOutcomeMessages = await getMessagesAfter({
roomId,
limit: DEFAULT_MESSAGE_LIMIT,
types: ['bot'],
fromDate: dayjs(latestOutcome.created_at),
});
const latestOutcomeMessages = await getMessagesAfter({
roomId,
limit: DEFAULT_MESSAGE_LIMIT,
fromDate: dayjs(latestOutcome?.created_at),
});
const latestOutcomeMessagesWithoutTags = latestOutcomeMessages.filter((message) => {
return isEmpty(message.tags);
});
// time helpers
const timeSinceRoomStartedMs = Math.abs(dayjs().diff(dayjs(room?.created_at), 'ms'));
const timeSinceLatestOutcome = Math.abs(dayjs().diff(dayjs(latestOutcome.created_at), 'ms'));
const shouldInviteVote = timeSinceLatestOutcome > INVITE_VOTE_AFTER_MS;
const shouldInviteOpenDiscussion = timeSinceLatestOutcome > INVITE_OPEN_DISCUSSION_AFTER_MS;
const shouldInviteToPass = timeSinceLatestOutcome > INVITE_PASS_AFTER_MS;
const shouldTimeoutVote = timeSinceLatestOutcome > TIMEOUT_VOTE_AFTER_MS;
const shouldTimeoutConversation = timeSinceRoomStartedMs > TIMEOUT_CONVSERSATION_AFTER_MS;
// contribution helpers
const contributingParticipantIds = unique(latestOutcomeMessagesWithoutTags.map((message) => message.participant_id)).filter((id) => id !== null) as string[];
const hasEveryoneContributed = contributingParticipantIds.length === participantIds.length;
const hasAnyoneContributed = contributingParticipantIds.length > 0;
const lackingContributingNicknames = getLackingNicknames({
participants,
requiredParticipantIds: contributingParticipantIds,
});
// presence helpers
const presentParticipantIds = participants?.filter((participant) => {
const timeSinceLastSeenMs = Math.abs(dayjs().diff(dayjs(participant.last_seen_at), 'ms'));
return timeSinceLastSeenMs <= NOTIFY_LEAVING_PARTICIPANT_AFTER_MS;
}).map((participant) => participant.id) ?? [];
const hasLeavingParticipants = presentParticipantIds?.length < participantIds.length;
const lackingPresenceNicknames = getLackingNicknames({
participants,
requiredParticipantIds: presentParticipantIds,
});
// request helpers
const hasRequestedNextStatement = latestOutcomeMessages.some((message) => {
return message.tags?.toLowerCase().includes('next-statement');
});
// bot message helpers
const attemptSendBotMessage = async (options: AttemptSendBotMessageOptions) => {
const {
tags, sendOnce = false,
scope = 'latestOutcome', force = false,
tagCooldownMs = BOT_TAG_COOLDOWN_MS, outcomeCooldownMs = BOT_OUTCOME_COOLDOWN_MS,
} = options;
const messages = scope === 'latestOutcome' ? latestBotOutcomeMessages : roomBotMessages;
const hasRecentlySentBotMessage = latestBotOutcomeMessages.some((message) => {
return dayjs(message.created_at).isAfter(dayjs().subtract(outcomeCooldownMs, 'ms'));
});
// guard: check if we should force the message
if (force) {
await sendBotMessage(options);
return;
}
if (hasRecentlySentBotMessage) {
return;
}
const latestMessageWithTags = messages.find((message) => {
return !!tags && message.tags?.includes(tags);
});
// guard: check if the bot has sent a message with the same tag recently
if (latestMessageWithTags) {
// skip when already sent out
if (sendOnce) {
return;
}
const timeSinceLatestMessageWithTags = Math.abs(dayjs().diff(dayjs(latestMessageWithTags.created_at), 'ms'));
// guard: check if the bot has sent a message with the same tag recently
if (timeSinceLatestMessageWithTags < tagCooldownMs) {
return;
}
}
await sendBotMessage(options);
};
// guard: timeout the conversation when taking too long
if (shouldTimeoutConversation) {
await attemptSendBotMessage({
roomId,
content: getTimeoutConversationMessageContent(),
force: true,
});
await updateRoomStatus({
roomId,
roomStatus: 'end',
helpers,
});
return;
}
// send message when there are participants who have left
if (hasLeavingParticipants) {
await attemptSendBotMessage({
roomId,
content: getLeavingParticipantsMessageContent(lackingPresenceNicknames),
tags: 'leaving-participants',
tagCooldownMs: BOT_TAG_COOLDOWN_MS * 3,
});
return;
}
// always notify the participants of the timekeeping
TIMEKEEPING_MOMENTS.forEach((timekeepingTimeMs) => {
if (timeSinceRoomStartedMs > timekeepingTimeMs) {
attemptSendBotMessage({
roomId,
content: getTimeKeepingMessageContent(timekeepingTimeMs, roomOutcomeAmount),
tags: `timekeeping-${timekeepingTimeMs}`,
sendOnce: true,
scope: 'room',
});
}
});
// send a message to invite others to contribute
if (hasAnyoneContributed && !hasEveryoneContributed) {
await attemptSendBotMessage({
roomId,
content: getInviteContributionMessageContent(lackingContributingNicknames),
tags: 'not-everyone-contributed',
tagCooldownMs: BOT_TAG_COOLDOWN_MS * 2,
});
}
// check if everyone contributed so we can attempt to make a summary
if (hasEveryoneContributed) {
const taskContent = joinMessagesContentWithParticipants(latestOutcomeMessages, participants);
const contributionCheckResult = await createVerificationFunctionCompletion({
taskInstruction: getSummarisationPrompt({
mode: 'verification',
statement: latestOutcomeContent,
}),
taskContent,
});
const isContribution = contributionCheckResult?.verified ?? false;
// check if there is no contribution yet
if (!isContribution) {
await attemptSendBotMessage({
roomId,
content: getNoVerifiedContributionMessageContent(),
tags: 'no-verified-contribution-yet',
});
}
// console.log('isContribution', contributionCheckResult, taskContent); process.exit(1);
// guard: check if there is a valuable contribution
if (isContribution) {
await attemptSendBotMessage({
roomId,
content: getVerifiedContributionMessageContent(),
tags: 'verified-contribution',
force: true,
});
const contributionResult = await createPromptCompletion({
taskInstruction: getSummarisationPrompt({
mode: 'completion',
statement: latestOutcomeContent,
}),
taskContent,
});
const hasValuableContent = !contributionResult?.includes('NO_CONTENT_FOUND') ?? false;
const hasContribution = !!contributionResult && !isEmpty(contributionResult.trim()) && hasValuableContent;
// moderator sends message "i don't think this is valuable etc"
if (!hasValuableContent) {
await attemptSendBotMessage({
roomId,
content: getNoVerifiedContributionMessageContent(),
tags: 'no-verified-contribution-yet',
});
}
if (hasContribution) {
// store the consensus and send it over to the current room for them to vote on
const { data: newOutcomes } = await supabaseClient
.from('outcomes')
.insert({
content: contributionResult,
type: 'consensus',
room_id: roomId,
topic_id: topicId,
original_outcome_id: latestOutcomeId,
})
.select();
const newOutcome = newOutcomes?.[0];
helpers.logger.info(`Created a new outcome for room ${roomId} with id ${newOutcome?.id} and content ${newOutcome?.content}`);
if (newOutcome) {
await Promise.allSettled([
attemptSendBotMessage({
roomId,
content: getNewContributionMessageContent(),
tags: 'new-contributions',
force: true,
}),
supabaseClient
.from('outcome_sources')
.insert(latestOutcomeMessages.map((message) => {
return {
outcome_id: newOutcome.id,
message_id: message.id,
};
}))
]);
return;
}
}
}
}
// guard: check if we should timeout the vote
if (shouldTimeoutVote) {
await sendNewCrossPollination({
roomId,
topicId,
helpers,
attemptSendBotMessage,
beforeSend: async () => {
helpers.logger.info(`Timing out vote for room ${roomId}...`);
await attemptSendBotMessage({
roomId,
content: getTimeoutVoteMessageContent(),
tags: 'timeout-vote',
force: true,
});
await waitFor(NEW_OUTCOME_AFTER_VOTE_TIMEOUT_MS);
},
});
return;
}
// check if we should moderate the open discussion a bit when it all takes too long
if (shouldInviteOpenDiscussion || shouldInviteToPass) {
helpers.logger.info(`Moderating open discussion in room ${roomId}...`);
// send opening invite only once
attemptSendBotMessage({
roomId,
content: getOpenDiscussionMessageContent(),
tags: 'open-discussion',
sendOnce: true,
});
}
// fetching all opinions to see what the votes look like
const { data: allOpinions } = await supabaseClient
.from('opinions')
.select()
.eq('outcome_id', latestOutcomeId)
.in('participant_id', participantIds)
.order('created_at', { ascending: false });
const opinions = unique(allOpinions ?? [], (opinion) => {
const { outcome_id, participant_id } = opinion;
return `${outcome_id}-${participant_id}`;
});
const hasEveryoneVoted = getHasEveryoneVoted({ opinions, participantIds });
const hasEveryoneVotedTheSame = getHasEveryoneVotedTheSame({ opinions });
// guard: check if a next statement is requested
if (hasEveryoneVoted && hasRequestedNextStatement) {
await sendNewCrossPollination({
roomId,
topicId,
helpers,
attemptSendBotMessage,
});
return;
}
// attempt send message to remind people to vote
if (shouldInviteVote && !hasEveryoneVoted) {
helpers.logger.info(`Sending vote reminder for room ${roomId}...`);
await attemptSendBotMessage({
roomId,
content: getVoteInviteMessageContent(),
tags: 'invite-to-vote',
tagCooldownMs: BOT_TAG_COOLDOWN_MS * 4,
});
}
// send new cross pollination when everyone has voted the same
if (hasEveryoneVoted && hasEveryoneVotedTheSame) {
await attemptSendBotMessage({
roomId,
content: getVotedSameMessageContent(),
tags: 'everyone-voted-the-same',
});
return;
}
// guard: check if everyone has not voted the same
if (hasEveryoneVoted && !hasEveryoneVotedTheSame) {
await attemptSendBotMessage({
roomId,
content: getNotEveryoneVotedTheSameMessageContent(),
tags: 'not-everyone-voted-the-same',
});
}
}