functions/schemaLoader.js (79 lines of code) (raw):

/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ const functions = require('firebase-functions'); const admin = require('firebase-admin'); const axios = require('axios'); const {BigQuery} = require('@google-cloud/bigquery'); /* Fetch version of latest schemas deployed to the Fenix Baseline table */ async function getGleanBuildIdLabel() { const bigquery = new BigQuery({projectId: "moz-fx-data-shared-prod"}); const datasetId = "org_mozilla_fenix_stable"; const tableId = "baseline_v1"; // Retrieve current dataset metadata. const table = bigquery.dataset(datasetId).table(tableId); const [metadata] = await table.getMetadata(); const labels = metadata.labels; let schemasBuildIdLabel = labels["schemas_build_id"]; if (!schemasBuildIdLabel) { console.error("`schemas_build_id` label is missing, can't update schema") } return schemasBuildIdLabel; } async function getLatestStoredSchemaTs() { try { const db = admin.firestore(); const latestSchema = await db.collection('glean_schemas').doc('latest').get(); if (!latestSchema.exists) { console.log('Latest glean schema doc not found in Firestore'); return null; } else { return latestSchema.data().deployTimestamp; } } catch (error) { console.error(error); return null; } } async function fetchGleanSchema(hash) { const schemaUrl = "https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas/" + hash + "/schemas/glean/glean/glean.1.schema.json"; return axios.get(schemaUrl) .then(response => JSON.stringify(response.data)) .catch(error => console.error(error)); } async function storeSchema(schema, schemaInfo) { const db = admin.firestore(); const batch = db.batch(); db.collection("glean_schemas"); const latestSchemaRef = db.collection("glean_schemas").doc("latest"); const byDateSchemaRef = db.collection("glean_schemas").doc(schemaInfo.timestamp); const schemaData = { deployTimestamp: schemaInfo.timestamp, commitHash: schemaInfo.hash, schema: schema, insertTs: Date.now(), }; batch.set(latestSchemaRef, schemaData); batch.set(byDateSchemaRef, schemaData); return batch.commit(); } exports.gleanSchemaLoader = functions.pubsub.schedule('*/15 * * * *').onRun(async (context) => { const latestStoredSchemaTs = await getLatestStoredSchemaTs(); const schemasBuildIdLabel = await getGleanBuildIdLabel(); const schemaInfo = extractSchemaInfo(schemasBuildIdLabel); const latestDeployedSchemaTs = schemaInfo.timestamp; console.log(`Latest schema timestamps:\nstored: ${latestStoredSchemaTs}\ndeployed: ${latestDeployedSchemaTs}`); if (latestStoredSchemaTs !== latestDeployedSchemaTs) { console.log("Fetching new schema..."); const gleanSchema = await fetchGleanSchema(schemaInfo.hash); return storeSchema(gleanSchema, schemaInfo) .then(_ => console.log("Done")) .catch(error => console.error(error)); } else { console.log("Schema is up to date, nothing to do"); return Promise.resolve(); } }); function extractSchemaInfo(schemasBuildIdLabel) { const split = schemasBuildIdLabel.split("_"); return {hash: split[1], timestamp: split[0]}; } exports.testables = { extractSchemaHash: extractSchemaInfo }