app/routes/api.jobs.$jobId.logs.tsx (103 lines of code) (raw):

import { type LoaderFunctionArgs } from "@remix-run/node"; import { getJobStore } from "~/lib/server/jobStore"; import { extractCredentialsFromCookie, hasValidCredentials, } from "~/lib/server/auth"; // GET /api/jobs/:jobId/logs - Stream job logs via Server-Sent Events export async function loader({ request, params }: LoaderFunctionArgs) { const jobId = params.jobId; if (!jobId) { return new Response("Job ID is required", { status: 400 }); } // Check authentication const cookieHeader = request.headers.get("Cookie"); const credentials = extractCredentialsFromCookie(cookieHeader); if (!hasValidCredentials(credentials)) { return new Response("Unauthorized", { status: 401 }); } // Set up Server-Sent Events stream const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); // Send initial connection message controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: "connected", jobId })}\n\n` ) ); let intervalId: NodeJS.Timeout; let lastLogLength = 0; const sendLogs = async () => { try { const jobStore = getJobStore(); const job = await jobStore.getJob(jobId); if (!job) { controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: "error", message: "Job not found" })}\n\n` ) ); return; } // Get current logs const logs = await jobStore.getJobLogs(jobId); if (logs && logs.length > lastLogLength) { // Send only new log content const newLogs = logs.substring(lastLogLength); lastLogLength = logs.length; controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: "logs", data: newLogs, timestamp: new Date().toISOString(), })}\n\n` ) ); } // Send job status update controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: "status", status: job.status, timestamp: new Date().toISOString(), })}\n\n` ) ); // If job is completed or failed, stop streaming if (job.status === "completed" || job.status === "failed") { controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: "finished", status: job.status, timestamp: new Date().toISOString(), })}\n\n` ) ); clearInterval(intervalId); controller.close(); } } catch (error) { console.error("Error streaming logs:", error); controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: "error", message: "Failed to fetch logs", })}\n\n` ) ); } }; // Start streaming logs every 2 seconds intervalId = setInterval(sendLogs, 2000); // Send initial logs immediately sendLogs(); // Clean up on client disconnect request.signal.addEventListener("abort", () => { clearInterval(intervalId); controller.close(); }); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "X-Accel-Buffering": "no", // Disable proxy buffering }, }); }