in src/routes/conversation/[id]/+server.ts [342:436]
async function update(event: MessageUpdate) {
if (!messageToWriteTo || !conv) {
throw Error("No message or conversation to write events to");
}
// Add token to content or skip if empty
if (event.type === MessageUpdateType.Stream) {
if (event.token === "") return;
messageToWriteTo.content += event.token;
// add to token total
MetricsServer.getMetrics().model.tokenCountTotal.inc({ model: model?.id });
// if this is the first token, add to time to first token
if (!lastTokenTimestamp) {
MetricsServer.getMetrics().model.timeToFirstToken.observe(
{ model: model?.id },
Date.now() - promptedAt.getTime()
);
lastTokenTimestamp = new Date();
}
// add to time per token
MetricsServer.getMetrics().model.timePerOutputToken.observe(
{ model: model?.id },
Date.now() - (lastTokenTimestamp ?? promptedAt).getTime()
);
lastTokenTimestamp = new Date();
} else if (
event.type === MessageUpdateType.Reasoning &&
event.subtype === MessageReasoningUpdateType.Stream
) {
messageToWriteTo.reasoning ??= "";
messageToWriteTo.reasoning += event.token;
}
// Set the title
else if (event.type === MessageUpdateType.Title) {
conv.title = event.title;
await collections.conversations.updateOne(
{ _id: convId },
{ $set: { title: conv?.title, updatedAt: new Date() } }
);
}
// Set the final text and the interrupted flag
else if (event.type === MessageUpdateType.FinalAnswer) {
messageToWriteTo.interrupted = event.interrupted;
messageToWriteTo.content = initialMessageContent + event.text;
// add to latency
MetricsServer.getMetrics().model.latency.observe(
{ model: model?.id },
Date.now() - promptedAt.getTime()
);
}
// Add file
else if (event.type === MessageUpdateType.File) {
messageToWriteTo.files = [
...(messageToWriteTo.files ?? []),
{ type: "hash", name: event.name, value: event.sha, mime: event.mime },
];
}
// Append to the persistent message updates if it's not a stream update
if (
event.type !== MessageUpdateType.Stream &&
!(
event.type === MessageUpdateType.Status &&
event.status === MessageUpdateStatus.KeepAlive
) &&
!(
event.type === MessageUpdateType.Reasoning &&
event.subtype === MessageReasoningUpdateType.Stream
)
) {
messageToWriteTo?.updates?.push(event);
}
// Avoid remote keylogging attack executed by watching packet lengths
// by padding the text with null chars to a fixed length
// https://cdn.arstechnica.net/wp-content/uploads/2024/03/LLM-Side-Channel.pdf
if (event.type === MessageUpdateType.Stream) {
event = { ...event, token: event.token.padEnd(16, "\0") };
}
// Send the update to the client
controller.enqueue(JSON.stringify(event) + "\n");
// Send 4096 of spaces to make sure the browser doesn't blocking buffer that holding the response
if (event.type === MessageUpdateType.FinalAnswer) {
controller.enqueue(" ".repeat(4096));
}
}