in src/package-sources/npmjs/npm-js-follower.lambda.ts [72:152]
await metricScope((metrics) => async () => {
const changes = await npm.changes(updatedMarker);
// Clear automatically set dimensions - we don't need them (see https://github.com/awslabs/aws-embedded-metrics-node/issues/73)
metrics.setDimensions();
// Recording current seq range and updating the `updatedMarker`.
metrics.setProperty('StartSeq', updatedMarker);
updatedMarker = changes.last_seq;
metrics.setProperty('EndSeq', updatedMarker);
const startTime = Date.now();
try {
const batch = changes.results as readonly Change[];
// The most recent "modified" timestamp observed in the batch.
let lastModified: Date | undefined;
// Emit npm.js replication lag
for (const { doc } of batch) {
if (doc?.time?.modified) {
const modified = new Date(doc.time.modified);
metrics.putMetric(
MetricName.NPMJS_CHANGE_AGE,
startTime - modified.getTime(),
Unit.Milliseconds,
);
if (lastModified == null || lastModified < modified) {
lastModified = modified;
}
}
}
console.log(`Received a batch of ${batch.length} element(s)`);
metrics.putMetric(MetricName.CHANGE_COUNT, batch.length, Unit.Count);
if (lastModified && lastModified < DAWN_OF_CONSTRUCTS) {
console.log(`Skipping batch as the latest modification is ${lastModified}, which is pre-Constructs`);
} else if (batch.length === 0) {
console.log('Received 0 changes, caught up to "now", exiting...');
shouldContinue = false;
} else {
// Obtain the modified package version from the update event, and filter
// out packages that are not of interest to us (not construct libraries).
const versionInfos = getRelevantVersionInfos(batch, metrics, denyList, licenseList, knownVersions);
console.log(`Identified ${versionInfos.length} relevant package version update(s)`);
metrics.putMetric(MetricName.RELEVANT_PACKAGE_VERSIONS, versionInfos.length, Unit.Count);
// Process all remaining updates
await Promise.all(versionInfos.map(async ({ infos, modified, seq }) => {
const invokeArgs: PackageVersion = {
integrity: infos.dist.shasum,
modified: modified.toISOString(),
name: infos.name,
seq: seq?.toString(),
tarballUrl: infos.dist.tarball,
version: infos.version,
};
// "Fire-and-forget" invocation here.
await aws.lambda().invokeAsync({
FunctionName: stagingFunction,
InvokeArgs: JSON.stringify(invokeArgs, null, 2),
}).promise();
// Record that this is now a "known" version (no need to re-discover)
knownVersions.set(`${infos.name}@${infos.version}`, modified);
}));
}
// Updating the S3 stored marker with the new seq id as communicated by nano.
await saveLastTransactionMarker(context, stagingBucket, updatedMarker, knownVersions);
} finally {
// Markers may not always be numeric (but in practice they are now), so we protect against that...
if (typeof updatedMarker === 'number' || /^\d+$/.test(updatedMarker)) {
metrics.putMetric(MetricName.LAST_SEQ, typeof updatedMarker === 'number' ? updatedMarker : parseInt(updatedMarker), Unit.None);
}
metrics.putMetric(MetricName.BATCH_PROCESSING_TIME, Date.now() - startTime, Unit.Milliseconds);
metrics.putMetric(MetricName.REMAINING_TIME, context.getRemainingTimeInMillis(), Unit.Milliseconds);
}
})();