poller-lambdas/localRun.ts (134 lines of code) (raw):

import type { SendMessageCommand } from '@aws-sdk/client-sqs'; import { SQSClient } from '@aws-sdk/client-sqs'; import prompts from 'prompts'; import type { PollerId } from '../shared/pollers'; import { getPollerSecretName, POLLER_LAMBDA_ENV_VAR_KEYS, POLLERS_CONFIG, } from '../shared/pollers'; import { sqs } from './src/aws'; import { handlers } from './src/index'; import type { HandlerInputSqsPayload } from './src/types'; const fakeInvoke = async ( handler: (sqsEvent: HandlerInputSqsPayload) => Promise<void>, input: string, fakeQueues: Record<string, SendMessageCommand[]>, ownQueueUrl: string, ingestionQueueUrl: string, ) => { console.log('---------------------------------------------'); console.log(`POLLER INVOCATION STARTED ${new Date().toISOString()}`); console.log('---------------------------------------------'); await handler({ Records: [ { body: input, messageId: 'FAKE_MESSAGE_ID', }, ], }); const ownQueue = fakeQueues[ownQueueUrl]!; const ingestionQueue = fakeQueues[ingestionQueueUrl]!; const nextQueueItem = ownQueue.shift()!.input; const inputForNext = nextQueueItem.MessageBody!; const maybeDelaySeconds = nextQueueItem.DelaySeconds; console.log('---------------------------------------------'); console.log(`POLLER INVOCATION COMPLETED ${new Date().toISOString()}`); if (maybeDelaySeconds) { console.log(` DelaySeconds: ${maybeDelaySeconds}`); } console.log(` input for next invocation: ${inputForNext}`); while (ingestionQueue.length > 0) { const nextIngestionItem = ingestionQueue.shift()!.input; console.log(` ITEM SENT TO INGESTION QUEUE:`, nextIngestionItem); } console.log('---------------------------------------------'); const { nextAction } = (await prompts({ type: 'select', name: 'nextAction', message: 'What next?', choices: [ ...(maybeDelaySeconds ? [ { title: `invoke after ${maybeDelaySeconds}s`, value() { console.log( `Waiting ${maybeDelaySeconds}s before invoking again...`, ); setTimeout( () => void fakeInvoke( handler, inputForNext, fakeQueues, ownQueueUrl, ingestionQueueUrl, ), maybeDelaySeconds * 1000, ); }, }, ] : []), { title: 'Invoke again immediately', value() { void fakeInvoke( handler, inputForNext, fakeQueues, ownQueueUrl, ingestionQueueUrl, ); }, }, { title: 'Exit', value() { process.exit(0); }, }, ], })) as { nextAction: () => void }; nextAction(); }; void (async () => { if (sqs instanceof SQSClient) { throw Error( 'SQS appears to be using a real client - this file should be using a FAKE local SQS client', ); } const ownQueueUrl = 'FAKE_OWN_QUEUE_URL'; process.env[POLLER_LAMBDA_ENV_VAR_KEYS.OWN_QUEUE_URL] = ownQueueUrl; const ingestionQueueUrl = 'FAKE_INGESTION_QUEUE_URL'; process.env[POLLER_LAMBDA_ENV_VAR_KEYS.INGESTION_LAMBDA_QUEUE_URL] = ingestionQueueUrl; const { pollerId } = (await prompts({ type: 'select', name: 'pollerId', message: 'Which poller?', choices: Object.keys(POLLERS_CONFIG).map((pollerId) => ({ title: pollerId, value: pollerId, })), })) as { pollerId: PollerId }; process.env[POLLER_LAMBDA_ENV_VAR_KEYS.SECRET_NAME] = getPollerSecretName( 'CODE', pollerId, ); const handler = handlers[pollerId]; const { input } = (await prompts({ type: 'text', name: 'input', message: 'Do you want to provide a starting input?', })) as { input: string }; await fakeInvoke( handler, input, sqs.queueData, ownQueueUrl, ingestionQueueUrl, ); })();