async function computeStats()

in src/lib/jobs/refresh-conversation-stats.ts [31:236]


async function computeStats(params: {
	dateField: ConversationStats["date"]["field"];
	span: ConversationStats["date"]["span"];
	type: ConversationStats["type"];
}) {
	const lastComputed = await collections.conversationStats.findOne(
		{ "date.field": params.dateField, "date.span": params.span, type: params.type },
		{ sort: { "date.at": -1 } }
	);

	// If the last computed week is at the beginning of the last computed month, we need to include some days from the previous month
	// In those cases we need to compute the stats from before the last month as everything is one aggregation
	const minDate = lastComputed ? lastComputed.date.at : new Date(0);

	logger.debug(
		{ minDate, dateField: params.dateField, span: params.span, type: params.type },
		"Computing conversation stats"
	);

	const dateField = params.type === "message" ? "messages." + params.dateField : params.dateField;

	const pipeline = [
		{
			$match: {
				[dateField]: { $gte: minDate },
			},
		},
		{
			$project: {
				[dateField]: 1,
				sessionId: 1,
				userId: 1,
			},
		},
		...(params.type === "message"
			? [
					{
						$unwind: "$messages",
					},
					{
						$match: {
							[dateField]: { $gte: minDate },
						},
					},
				]
			: []),
		{
			$sort: {
				[dateField]: 1,
			},
		},
		{
			$facet: {
				userId: [
					{
						$match: {
							userId: { $exists: true },
						},
					},
					{
						$group: {
							_id: {
								at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
								userId: "$userId",
							},
						},
					},
					{
						$group: {
							_id: "$_id.at",
							count: { $sum: 1 },
						},
					},
					{
						$project: {
							_id: 0,
							date: {
								at: "$_id",
								field: params.dateField,
								span: params.span,
							},
							distinct: "userId",
							count: 1,
						},
					},
				],
				sessionId: [
					{
						$match: {
							sessionId: { $exists: true },
						},
					},
					{
						$group: {
							_id: {
								at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
								sessionId: "$sessionId",
							},
						},
					},
					{
						$group: {
							_id: "$_id.at",
							count: { $sum: 1 },
						},
					},
					{
						$project: {
							_id: 0,
							date: {
								at: "$_id",
								field: params.dateField,
								span: params.span,
							},
							distinct: "sessionId",
							count: 1,
						},
					},
				],
				userOrSessionId: [
					{
						$group: {
							_id: {
								at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
								userOrSessionId: { $ifNull: ["$userId", "$sessionId"] },
							},
						},
					},
					{
						$group: {
							_id: "$_id.at",
							count: { $sum: 1 },
						},
					},
					{
						$project: {
							_id: 0,
							date: {
								at: "$_id",
								field: params.dateField,
								span: params.span,
							},
							distinct: "userOrSessionId",
							count: 1,
						},
					},
				],
				_id: [
					{
						$group: {
							_id: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
							count: { $sum: 1 },
						},
					},
					{
						$project: {
							_id: 0,
							date: {
								at: "$_id",
								field: params.dateField,
								span: params.span,
							},
							distinct: "_id",
							count: 1,
						},
					},
				],
			},
		},
		{
			$project: {
				stats: {
					$concatArrays: ["$userId", "$sessionId", "$userOrSessionId", "$_id"],
				},
			},
		},
		{
			$unwind: "$stats",
		},
		{
			$replaceRoot: {
				newRoot: "$stats",
			},
		},
		{
			$set: {
				type: params.type,
			},
		},
		{
			$merge: {
				into: CONVERSATION_STATS_COLLECTION,
				on: ["date.at", "type", "date.span", "date.field", "distinct"],
				whenMatched: "replace",
				whenNotMatched: "insert",
			},
		},
	];

	await collections.conversations.aggregate(pipeline, { allowDiskUse: true }).next();

	logger.debug(
		{ minDate, dateField: params.dateField, span: params.span, type: params.type },
		"Computed conversation stats"
	);
}