await metricScope()

in src/package-sources/npmjs/npm-js-follower.lambda.ts [72:152]


    await metricScope((metrics) => async () => {
      const changes = await npm.changes(updatedMarker);

      // Clear automatically set dimensions - we don't need them (see https://github.com/awslabs/aws-embedded-metrics-node/issues/73)
      metrics.setDimensions();

      // Recording current seq range and updating the `updatedMarker`.
      metrics.setProperty('StartSeq', updatedMarker);
      updatedMarker = changes.last_seq;
      metrics.setProperty('EndSeq', updatedMarker);

      const startTime = Date.now();

      try {
        const batch = changes.results as readonly Change[];

        // The most recent "modified" timestamp observed in the batch.
        let lastModified: Date | undefined;
        // Emit npm.js replication lag
        for (const { doc } of batch) {
          if (doc?.time?.modified) {
            const modified = new Date(doc.time.modified);
            metrics.putMetric(
              MetricName.NPMJS_CHANGE_AGE,
              startTime - modified.getTime(),
              Unit.Milliseconds,
            );
            if (lastModified == null || lastModified < modified) {
              lastModified = modified;
            }
          }
        }

        console.log(`Received a batch of ${batch.length} element(s)`);
        metrics.putMetric(MetricName.CHANGE_COUNT, batch.length, Unit.Count);

        if (lastModified && lastModified < DAWN_OF_CONSTRUCTS) {
          console.log(`Skipping batch as the latest modification is ${lastModified}, which is pre-Constructs`);
        } else if (batch.length === 0) {
          console.log('Received 0 changes, caught up to "now", exiting...');
          shouldContinue = false;
        } else {
          // Obtain the modified package version from the update event, and filter
          // out packages that are not of interest to us (not construct libraries).
          const versionInfos = getRelevantVersionInfos(batch, metrics, denyList, licenseList, knownVersions);
          console.log(`Identified ${versionInfos.length} relevant package version update(s)`);
          metrics.putMetric(MetricName.RELEVANT_PACKAGE_VERSIONS, versionInfos.length, Unit.Count);

          // Process all remaining updates
          await Promise.all(versionInfos.map(async ({ infos, modified, seq }) => {
            const invokeArgs: PackageVersion = {
              integrity: infos.dist.shasum,
              modified: modified.toISOString(),
              name: infos.name,
              seq: seq?.toString(),
              tarballUrl: infos.dist.tarball,
              version: infos.version,
            };
            // "Fire-and-forget" invocation here.
            await aws.lambda().invokeAsync({
              FunctionName: stagingFunction,
              InvokeArgs: JSON.stringify(invokeArgs, null, 2),
            }).promise();
            // Record that this is now a "known" version (no need to re-discover)
            knownVersions.set(`${infos.name}@${infos.version}`, modified);
          }));
        }

        // Updating the S3 stored marker with the new seq id as communicated by nano.
        await saveLastTransactionMarker(context, stagingBucket, updatedMarker, knownVersions);

      } finally {
        // Markers may not always be numeric (but in practice they are now), so we protect against that...
        if (typeof updatedMarker === 'number' || /^\d+$/.test(updatedMarker)) {
          metrics.putMetric(MetricName.LAST_SEQ, typeof updatedMarker === 'number' ? updatedMarker : parseInt(updatedMarker), Unit.None);
        }

        metrics.putMetric(MetricName.BATCH_PROCESSING_TIME, Date.now() - startTime, Unit.Milliseconds);
        metrics.putMetric(MetricName.REMAINING_TIME, context.getRemainingTimeInMillis(), Unit.Milliseconds);
      }
    })();