projects/event-lambdas/src/redrive-from-S3-lambda/index.ts (73 lines of code) (raw):
import {
getEventsFromS3File,
putEventsToKinesisStream,
commenceScaleKinesisShardCount, augmentWithId,
} from "../lib/util";
import {s3} from "../lib/aws";
import {telemetryBucketName} from "../lib/constants";
import {Marker} from "aws-sdk/clients/s3";
const oneMinInMillis = 60 * 1000;
const tenMinsInMillis = 10 * oneMinInMillis;
const thirteenMinsInMillis = 13 * oneMinInMillis;
const SCALING_DOWN_KINESIS = "SCALING_DOWN_KINESIS" as const;
type MarkerDone = null;
export const handler = async (
event: { Payload?: Marker | typeof SCALING_DOWN_KINESIS | unknown }
): Promise<Marker | MarkerDone | typeof SCALING_DOWN_KINESIS> => {
const startTimeEpoch = new Date().getTime();
if(event?.Payload === SCALING_DOWN_KINESIS) {
while(new Date().getTime() - startTimeEpoch < thirteenMinsInMillis) {
const targetShardCount = await commenceScaleKinesisShardCount("half");
if(targetShardCount === 1) {
console.log("Kinesis has scaled back down to 1 shard. Re-drive complete 🎉")
return null;
}
await new Promise(resolve => setTimeout(resolve, oneMinInMillis));
}
return SCALING_DOWN_KINESIS; // too close to timing out, so finish the lambda and allow the step function to loop
}
if(typeof event?.Payload !== "undefined" && typeof event?.Payload !== "string") {
console.error("Invalid input.", event)
throw Error("Invalid input. Expected string, got " + typeof event.Payload);
}
const maybeStartingMarker = event?.Payload;
const processPageOfFiles = async (maybeMarker: Marker | undefined): Promise<Marker | MarkerDone> => {
// we could instead turn on bucket inventory, and work through that
// (this would mean we could start with latest, working back through time, as more recent is probably most useful/urgent)
const pageOfFilesResponse = await s3.listObjects({
Bucket: telemetryBucketName,
Marker: maybeMarker,
MaxKeys: 500, // although max is 1000, we want to ensure the lambda can send all the events before timing out and some event files are massive
}).promise();
const pageOfFiles = pageOfFilesResponse.Contents;
if(!pageOfFiles || pageOfFiles.length === 0) {
console.log("No files to process")
return null;
}
console.log(`page of files: ${pageOfFiles.length}, starting at ${pageOfFilesResponse.Marker}`);
const eventsArrays = await Promise.all(pageOfFiles.map(async (file, index) => {
const Key = file.Key;
if(!Key) {
console.error("No key found in file", file);
return [];
}
console.log(`Attempting to read from file (${index + 1} of ${pageOfFiles.length}) at s3://${telemetryBucketName}/${Key}`);
const maybeEvents = await getEventsFromS3File(Key);
if (maybeEvents.error) {
console.error(`Invalid data in file with key ${Key}`, maybeEvents.error);
return [];
}
return maybeEvents.value.map(augmentWithId(Key));
}));
const events = eventsArrays.flat();
await putEventsToKinesisStream(events, {shouldThrowOnError: true, shouldScaleOnDemand: true });
if(!pageOfFilesResponse.IsTruncated) {
console.log("No more files to process.")
return SCALING_DOWN_KINESIS;
}
const nextMarker = pageOfFilesResponse.NextMarker || pageOfFiles[pageOfFiles.length - 1].Key!;
// so long as we have at least five mins before 15min timeout, do another page
if(new Date().getTime() - startTimeEpoch < tenMinsInMillis) {
return await processPageOfFiles(nextMarker);
}
return nextMarker; // too close to timing out, so finish the lambda and allow the step function to loop
};
return await processPageOfFiles(maybeStartingMarker || undefined); // coerce empty string etc to undefined, i.e. not starting marker;
}