glean/src/core/upload/worker.ts (93 lines of code) (raw):
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
import type { QueuedPing } from "./manager.js";
import type Uploader from "./uploader.js";
import type { UploadTask } from "./task.js";
import { Context } from "../context.js";
import log, { LoggingLevel } from "../log.js";
import Policy from "./policy.js";
import { UploadResult, UploadResultStatus } from "./uploader.js";
import { UploadTaskTypes } from "./task.js";
import { GLEAN_VERSION } from "../constants.js";
import { PingBodyOverflowError } from "./ping_body_overflow_error.js";
import PingRequest from "./ping_request.js";
const PING_UPLOAD_WORKER_LOG_TAG = "core.Upload.PingUploadWorker";
class PingUploadWorker {
// Whether or not someone is blocking on the currentJob.
isBlocking = false;
constructor(
private readonly uploader: Uploader,
private readonly serverEndpoint: string,
private readonly policy = new Policy()
) { }
/**
* Builds a ping request.
*
* This includes:
*
* 1. Includes Glean required headers to the ping;
* These are the headers described in https://mozilla.github.io/glean/book/user/pings/index.html?highlight=headers#submitted-headers
* 2. Stringifies the body.
*
* @param ping The ping to include the headers in.
* @returns The updated ping.
*/
private buildPingRequest(ping: QueuedPing): PingRequest<string | Uint8Array> {
let headers = ping.headers || {};
headers = {
...ping.headers,
"Content-Type": "application/json; charset=utf-8",
Date: new Date().toISOString(),
"X-Telemetry-Agent": `Glean/${GLEAN_VERSION} (JS on ${(Context.platform).info.os()})`
};
const stringifiedBody = JSON.stringify(ping.payload);
if (stringifiedBody.length > this.policy.maxPingBodySize) {
throw new PingBodyOverflowError(
`Body for ping ${ping.identifier} exceeds ${this.policy.maxPingBodySize}bytes. Discarding.`
);
}
headers["Content-Length"] = stringifiedBody.length.toString();
return new PingRequest(ping.identifier, headers, stringifiedBody, this.policy.maxPingBodySize);
}
/**
* Attempts to upload a ping.
*
* @param ping The ping object containing headers and payload.
* @returns The status number of the response or `undefined` if unable to attempt upload.
*/
private async attemptPingUpload(ping: QueuedPing): Promise<UploadResult> {
try {
const finalPing = this.buildPingRequest(ping);
// The POST call has to be asynchronous. Once the API call is triggered,
// we rely on the browser's "keepalive" header.
return this.uploader.post(
`${this.serverEndpoint}${ping.path}`,
finalPing
);
} catch (e) {
log(
PING_UPLOAD_WORKER_LOG_TAG,
["Error trying to build or post ping request:", e],
LoggingLevel.Warn
);
// An unrecoverable failure will make sure the offending ping is removed from the queue and
// deleted from the database, which is what we want here.
return new UploadResult(UploadResultStatus.UnrecoverableFailure);
}
}
/**
* Kick start non-blocking asynchronous internal loop to get and act on upload tasks.
*
* If a job is currently ongoing, this is a no-op.
*
* @param getUploadTask A function that returns an UploadTask.
* @param processUploadResponse A function that processes an UploadResponse.
*/
work(
getUploadTask: () => UploadTask,
processUploadResponse: (ping: QueuedPing, result: UploadResult) => void
): void {
while (true) {
try {
const task = getUploadTask();
switch (task.type) {
case UploadTaskTypes.Upload: {
if (this.isBlocking) {
return;
}
this.attemptPingUpload(task.ping)
.then((result) => {
processUploadResponse(task.ping, result);
})
.catch((error) => {
console.log(error);
});
continue;
}
case UploadTaskTypes.Done:
return;
}
} catch (error) {
log(
PING_UPLOAD_WORKER_LOG_TAG,
["IMPOSSIBLE: Something went wrong while processing ping upload tasks.", error],
LoggingLevel.Error
);
}
}
}
/**
* Block the uploader from uploading pings.
*/
blockUploading() {
this.isBlocking = true;
}
/**
* Resume the uploader to upload pings.
*/
resumeUploading() {
this.isBlocking = false;
}
}
export default PingUploadWorker;