in typescript/src/export/exportHistoricalData.ts [87:159]
export async function handler(params: {
date: string;
maxMessagesToFetch?: number;
}): Promise<any> {
const bucket = process.env['ExportBucket'];
if (!bucket) throw new Error('Variable ExportBucket must be set');
const sqsUrl = process.env['SqsUrl'];
if (!sqsUrl) throw new Error('Variable SqsUrl must be set');
const numberOfMessagesNotVisible = await getNumberOfMessagesNotVisible(
sqsUrl,
);
if (numberOfMessagesNotVisible > 1) {
throw new Error(
`Approximately ${numberOfMessagesNotVisible} messages are unavailable for processing. Something else is currently consuming messages from ${sqsUrl}`,
);
}
const zippedStream = zlib.createGzip();
const maxMessagesToFetch = params.maxMessagesToFetch ?? Number.MAX_VALUE;
const yesterday =
params.date ?? plusDays(new Date(), -1).toISOString().substr(0, 10);
const prefix = Stage === 'PROD' ? 'data' : 'code-data';
const randomString = Math.random().toString(36).substring(10);
const filename = `${prefix}/date=${yesterday}/${yesterday}-${randomString}.json.gz`;
const managedUpload = s3.upload({
Bucket: bucket,
Key: filename,
Body: zippedStream,
ACL: 'bucket-owner-full-control',
});
const msgToDelete: string[] = [];
let totalMsgCount = 0;
let processedMsgCount = 0;
function handleOneMessage(sqsMessage: Message): void {
totalMsgCount++;
const parsedMessage = JSON.parse(sqsMessage.Body ?? '');
const messageDate = parsedMessage.snapshotDate.substr(0, 10);
if (messageDate == yesterday) {
processedMsgCount++;
const message = JSON.stringify(parsedMessage) + '\n';
zippedStream.write(message);
if (sqsMessage.ReceiptHandle) msgToDelete.push(sqsMessage.ReceiptHandle);
}
}
await recursivelyFetchSqsMessages(
sqsUrl,
maxMessagesToFetch,
handleOneMessage,
);
zippedStream.end();
await managedUpload.promise();
console.log(
`Export succeeded, read ${totalMsgCount} records, processed ${processedMsgCount}`,
);
await deleteAllSqsMessages(sqsUrl, msgToDelete);
console.log(`Deleted ${msgToDelete.length} messages from the SQS queue`);
return {
date: yesterday,
recordCount: totalMsgCount,
processedCount: processedMsgCount,
};
}