netlify/functions/store-ping-data-background/index.ts (91 lines of code) (raw):
import { gzipSync } from "node:zlib";
import { readFile } from "node:fs/promises";
import { emptyPings, pingFields, PingFieldType, DATA_VERSION } from "app/data/format.ts";
import type { IndexedStringPingField, PingFields, TypeMap, Pings, StringIndex } from "app/data/format.ts";
import type { Context } from "@netlify/functions";
import { getStore } from "@netlify/blobs";
import { BigQuery } from "@google-cloud/bigquery";
const EMPTY_RETRY_MS = 1000 * 60 * 15; // 15 minutes, as milliseconds
type Ping = {
[K in keyof PingFields]: TypeMap<string, string | null>[PingFields[K]];
};
type StringLookup = Map<string | null, number>;
function condenseData(data: Ping[]): Pings {
const fieldStringLookup = new Map<IndexedStringPingField, StringLookup>();
const output: Pings = emptyPings(() => { return { strings: [], values: [] } });
function getstr(field: IndexedStringPingField, s: string | null): StringIndex {
if (!fieldStringLookup.has(field)) {
fieldStringLookup.set(field, new Map<string | null, number>());
}
const stringLookup = fieldStringLookup.get(field)!;
if (!stringLookup.has(s)) {
stringLookup.set(s, output[field].strings.length);
// We've verified that non-null fields will have only non-null
// values (otherwise the ping is dropped), so we can safely push
// the value if null or not.
output[field].strings.push(s as any);
}
return stringLookup.get(s)!;
}
let badData = 0;
pingLoop: for (const ping of data) {
// First pass checks whether we should store the ping at all. These
// fields should never be null (based on the query), but we want to
// check to uphold the invariants of the produced data.
for (const [k, desc] of pingFields()) {
if (!desc.nullable && ping[k] === null) {
console.warn(`Unexpected null in ${k}, omitting ping`);
badData++;
continue pingLoop;
}
}
for (const [k, desc] of pingFields()) {
const inValue = ping[k];
if (desc.type === PingFieldType.IndexedString) {
const kfield = k as IndexedStringPingField;
output[kfield].values.push(getstr(kfield, inValue as string | null));
} else {
(output[k] as any[]).push(inValue);
}
}
}
if (badData > 0) {
console.warn(`Unexpected data in ${badData} pings`);
}
return output;
}
const BIGQUERY_PROJECT_ID = "moz-fx-data-shared-prod";
async function fetchData(date: string): Promise<Ping[]> {
const credentials_json = process.env["GOOGLE_APPLICATION_CREDENTIALS_JSON"];
if (!credentials_json) {
throw new Error("no google application credentials");
}
const credentials = JSON.parse(credentials_json);
const query = await readFile(`${import.meta.dirname}/query.sql`, "utf8");
const bq = new BigQuery({ projectId: BIGQUERY_PROJECT_ID, credentials });
const stream = bq.createQueryStream({ query, params: { date } });
return await new Promise((resolve, reject) => {
stream.on('error', reject);
const pings: Ping[] = [];
stream.on('data', row => pings.push(row));
stream.on('end', () => resolve(pings));
});
}
const LOCAL_PSK = process.env["NETLIFY_LOCAL_PSK"];
export default async (req: Request, _context: Context) => {
// Only allow 'local' requests from our own functions.
if (!LOCAL_PSK || req.headers.get("Authorization") !== `PSK ${LOCAL_PSK}`) {
throw new Error("Unauthorized");
}
const { date } = await req.json() as { date: string };
const currentDate = new Date();
const requestStore = getStore("ping-data-request");
// Set a marker as a best-effort to prevent multiple concurrent (and
// unnecessary) background functions for a particular date.
await requestStore.set(date, currentDate.toISOString());
const pings: Ping[] = await fetchData(date).catch(e => {
console.error(e);
return [];
});
const metadata = {
date: currentDate.toISOString(),
retry: pings.length === 0 ? currentDate.getTime() + EMPTY_RETRY_MS : 0,
version: DATA_VERSION
};
const condensed = condenseData(pings);
const gzipped = gzipSync(JSON.stringify(condensed))
const store = getStore("ping-data");
await store.set(date, new Blob([gzipped]), { metadata });
await requestStore.delete(date);
};