in eventhandlers.js [36:82]
async function handleGcsPubSubUnwrap(ctx, perfCtx) {
const { eventType, bucketId, objectId } = ctx.message.attributes;
const msgData = JSON.parse(Buffer.from(ctx.message.data, "base64").toString());
const writeObj = {
timestamp: new Date(),
path: `${msgData.bucket}/${msgData.name}`,
version: msgData.generation,
};
switch (eventType) {
// The object is no longer current
case consts.GCS_OBJ_ARCHIVE:
// The object has been removed
case consts.GCS_OBJ_DELETE: {
writeObj.info = JSON.stringify({
event: eventType,
storage: { type: consts.STORAGE_TYPE_GCS },
});
await bq.insert(writeObj);
perfCtx.addRef("afterBqInsert");
break;
}
// The object has been replaced with a new version
case consts.GCS_OBJ_FINALIZE: {
// Use memory to read, avoiding volume mount in container
const buffer = await gcs.downloadToMemory(bucketId, objectId);
perfCtx.addRef("afterGcsDownloadToMemory");
writeObj.info = JSON.stringify({
event: eventType,
storage: { size: buffer.length, type: consts.STORAGE_TYPE_GCS },
});
const uriPath = gcs.createUriPath(bucketId, objectId);
writeObj.metadata = await getMetadata(buffer, uriPath);
perfCtx.addRef("afterGetMetadata");
await bq.insert(writeObj);
perfCtx.addRef("afterBqInsert");
break;
}
// Metadata has been updated on the object
case consts.GCS_OBJ_METADATA_UPDATE: {
// Do nothing
break;
}
}
if (DEBUG_MODE) {
console.log(JSON.stringify(writeObj));
}
}