export async function handler()

in typescript/src/export/exportHistoricalData.ts [87:159]


export async function handler(params: {
  date: string;
  maxMessagesToFetch?: number;
}): Promise<any> {
  const bucket = process.env['ExportBucket'];
  if (!bucket) throw new Error('Variable ExportBucket must be set');

  const sqsUrl = process.env['SqsUrl'];
  if (!sqsUrl) throw new Error('Variable SqsUrl must be set');

  const numberOfMessagesNotVisible = await getNumberOfMessagesNotVisible(
    sqsUrl,
  );
  if (numberOfMessagesNotVisible > 1) {
    throw new Error(
      `Approximately ${numberOfMessagesNotVisible} messages are unavailable for processing. Something else is currently consuming messages from ${sqsUrl}`,
    );
  }

  const zippedStream = zlib.createGzip();

  const maxMessagesToFetch = params.maxMessagesToFetch ?? Number.MAX_VALUE;

  const yesterday =
    params.date ?? plusDays(new Date(), -1).toISOString().substr(0, 10);
  const prefix = Stage === 'PROD' ? 'data' : 'code-data';
  const randomString = Math.random().toString(36).substring(10);
  const filename = `${prefix}/date=${yesterday}/${yesterday}-${randomString}.json.gz`;
  const managedUpload = s3.upload({
    Bucket: bucket,
    Key: filename,
    Body: zippedStream,
    ACL: 'bucket-owner-full-control',
  });

  const msgToDelete: string[] = [];
  let totalMsgCount = 0;
  let processedMsgCount = 0;

  function handleOneMessage(sqsMessage: Message): void {
    totalMsgCount++;
    const parsedMessage = JSON.parse(sqsMessage.Body ?? '');
    const messageDate = parsedMessage.snapshotDate.substr(0, 10);
    if (messageDate == yesterday) {
      processedMsgCount++;
      const message = JSON.stringify(parsedMessage) + '\n';
      zippedStream.write(message);
      if (sqsMessage.ReceiptHandle) msgToDelete.push(sqsMessage.ReceiptHandle);
    }
  }

  await recursivelyFetchSqsMessages(
    sqsUrl,
    maxMessagesToFetch,
    handleOneMessage,
  );

  zippedStream.end();
  await managedUpload.promise();

  console.log(
    `Export succeeded, read ${totalMsgCount} records, processed ${processedMsgCount}`,
  );

  await deleteAllSqsMessages(sqsUrl, msgToDelete);
  console.log(`Deleted ${msgToDelete.length} messages from the SQS queue`);

  return {
    date: yesterday,
    recordCount: totalMsgCount,
    processedCount: processedMsgCount,
  };
}