export async function POST()

in src/routes/admin/export/+server.ts [15:159]


export async function POST({ request }) {
	if (!config.PARQUET_EXPORT_DATASET || !config.PARQUET_EXPORT_HF_TOKEN) {
		error(500, "Parquet export is not configured.");
	}

	const { model } = z
		.object({
			model: z.string(),
		})
		.parse(await request.json());

	const schema = new parquet.ParquetSchema({
		title: { type: "UTF8" },
		created_at: { type: "TIMESTAMP_MILLIS" },
		updated_at: { type: "TIMESTAMP_MILLIS" },
		messages: {
			repeated: true,
			fields: {
				from: { type: "UTF8" },
				content: { type: "UTF8" },
				score: { type: "INT_8", optional: true },
			},
		},
	});

	const fileName = `/tmp/conversations-${new Date().toJSON().slice(0, 10)}-${Date.now()}.parquet`;

	const writer = await parquet.ParquetWriter.openFile(schema, fileName);

	let count = 0;
	logger.info("Exporting conversations for model", model);

	for await (const conversation of collections.settings.aggregate<{
		title: string;
		created_at: Date;
		updated_at: Date;
		messages: Message[];
	}>([
		{
			$match: {
				shareConversationsWithModelAuthors: true,
				sessionId: { $exists: true },
				userId: { $exists: false },
			},
		},
		{
			$lookup: {
				from: "conversations",
				localField: "sessionId",
				foreignField: "sessionId",
				as: "conversations",
				pipeline: [{ $match: { model, userId: { $exists: false } } }],
			},
		},
		{ $unwind: "$conversations" },
		{
			$project: {
				title: "$conversations.title",
				created_at: "$conversations.createdAt",
				updated_at: "$conversations.updatedAt",
				messages: "$conversations.messages",
			},
		},
	])) {
		await writer.appendRow({
			title: conversation.title,
			created_at: conversation.created_at,
			updated_at: conversation.updated_at,
			messages: conversation.messages.map((message: Message) => ({
				from: message.from,
				content: message.content,
				...(message.score ? { score: message.score } : undefined),
			})),
		});
		++count;

		if (count % 1_000 === 0) {
			logger.info("Exported", count, "conversations");
		}
	}

	logger.info("exporting convos with userId");

	for await (const conversation of collections.settings.aggregate<{
		title: string;
		created_at: Date;
		updated_at: Date;
		messages: Message[];
	}>([
		{ $match: { shareConversationsWithModelAuthors: true, userId: { $exists: true } } },
		{
			$lookup: {
				from: "conversations",
				localField: "userId",
				foreignField: "userId",
				as: "conversations",
				pipeline: [{ $match: { model } }],
			},
		},
		{ $unwind: "$conversations" },
		{
			$project: {
				title: "$conversations.title",
				created_at: "$conversations.createdAt",
				updated_at: "$conversations.updatedAt",
				messages: "$conversations.messages",
			},
		},
	])) {
		await writer.appendRow({
			title: conversation.title,
			created_at: conversation.created_at,
			updated_at: conversation.updated_at,
			messages: conversation.messages.map((message: Message) => ({
				from: message.from,
				content: message.content,
				...(message.score ? { score: message.score } : undefined),
			})),
		});
		++count;

		if (count % 1_000 === 0) {
			logger.info("Exported", count, "conversations");
		}
	}

	await writer.close();

	logger.info("Uploading", fileName, "to Hugging Face Hub");

	await uploadFile({
		file: pathToFileURL(fileName) as URL,
		credentials: { accessToken: config.PARQUET_EXPORT_HF_TOKEN },
		repo: {
			type: "dataset",
			name: config.PARQUET_EXPORT_DATASET,
		},
	});

	logger.info("Upload done");

	await unlink(fileName);

	return new Response();
}