poller-lambdas/src/index.ts (130 lines of code) (raw):
import {
GetSecretValueCommand,
PutSecretValueCommand,
} from '@aws-sdk/client-secrets-manager';
import type { SendMessageCommandInput } from '@aws-sdk/client-sqs';
import { SendMessageCommand } from '@aws-sdk/client-sqs';
import {
POLLER_FAILURE_EVENT_TYPE,
POLLER_INVOCATION_EVENT_TYPE,
} from '../../shared/constants';
import { createLogger } from '../../shared/lambda-logging';
import type { PollerId } from '../../shared/pollers';
import { POLLER_LAMBDA_ENV_VAR_KEYS } from '../../shared/pollers';
import { queueNextInvocation, secretsManager, sqs } from './aws';
import { getEnvironmentVariableOrCrash } from './config';
import { apPoller } from './pollers/ap/apPoller';
import { reutersPoller } from './pollers/reuters/reutersPoller';
import type { HandlerInputSqsPayload, PollFunction } from './types';
import { isFixedFrequencyPollOutput } from './types';
const pollerWrapper =
(pollerFunction: PollFunction) =>
async ({ Records }: HandlerInputSqsPayload) => {
const logger = createLogger({
sqsMessageId: Records.map((record) => record.messageId).join(', '),
});
logger.log({
message: `Poller lambda invoked with SQS message id: ${Records.map((record) => record.messageId).join(', ')}`,
eventType: POLLER_INVOCATION_EVENT_TYPE,
});
const startTimeEpochMillis = Date.now();
const secretName = getEnvironmentVariableOrCrash(
POLLER_LAMBDA_ENV_VAR_KEYS.SECRET_NAME,
);
const secret = await secretsManager
.send(
new GetSecretValueCommand({
SecretId: secretName,
}),
)
.then((_) => _.SecretString);
if (!secret) {
throw new Error(
`Secret not found at: ${POLLER_LAMBDA_ENV_VAR_KEYS.SECRET_NAME}`,
);
}
if (Records.length != 1) {
console.warn('Expected exactly one SQS record, but got', Records.length);
}
for (const record of Records) {
const valueFromPreviousPoll = record.body;
await pollerFunction({ secret, input: valueFromPreviousPoll, logger })
.then(async (output) => {
const endTimeEpochMillis = Date.now();
const messagesForIngestionLambda = Array.isArray(
output.payloadForIngestionLambda,
)
? output.payloadForIngestionLambda
: [output.payloadForIngestionLambda];
for (const { externalId, body } of messagesForIngestionLambda) {
console.log(
`Sending message to ingestion lambda with id: ${externalId}.`,
);
const message: SendMessageCommandInput = {
QueueUrl: getEnvironmentVariableOrCrash(
POLLER_LAMBDA_ENV_VAR_KEYS.INGESTION_LAMBDA_QUEUE_URL,
),
MessageBody: JSON.stringify(body),
MessageAttributes: {
'Message-Id': {
StringValue: externalId,
DataType: 'String',
},
},
};
await sqs.send(new SendMessageCommand(message)).catch((error) => {
logger.error({
message: `Sending to queue failed for ${externalId}`,
error: error instanceof Error ? error.message : error,
queueMessage: JSON.stringify(message),
});
throw error; // we still expect this to be terminal for the poller lambda
});
}
if (isFixedFrequencyPollOutput(output)) {
const safeIdealFrequencyInSeconds = Math.max(
5,
output.idealFrequencyInSeconds,
);
const remainingMillisBeforeNextInterval =
safeIdealFrequencyInSeconds * 1000 -
(endTimeEpochMillis - startTimeEpochMillis);
const delayInSeconds = Math.max(
remainingMillisBeforeNextInterval / 1000,
0,
);
await queueNextInvocation({
DelaySeconds: delayInSeconds,
MessageBody: output.valueForNextPoll,
});
} else {
await queueNextInvocation({
MessageBody: output.valueForNextPoll,
});
}
if (output.newSecretValue) {
// set new value in secrets manager
console.log(`Updating secret value for ${secretName}`);
await secretsManager.send(
new PutSecretValueCommand({
SecretId: secretName,
SecretString: output.newSecretValue,
}),
);
}
})
.catch((error) => {
logger.error({
message: `Poller lambda failed with message: ${error instanceof Error ? error.message : error}`,
sqsMessageId: Records.map((record) => record.messageId).join(', '),
eventType: POLLER_FAILURE_EVENT_TYPE,
pollerName: pollerFunction.name,
});
// consider still queuing next (perhaps with default delay or 1min) to avoid the lambda from stopping entirely
throw error;
});
}
};
export const handlers = {
// EXAMPLE_long_polling: pollerWrapper(EXAMPLE_long_polling),
// EXAMPLE_fixed_frequency: pollerWrapper(EXAMPLE_fixed_frequency),
reuters: pollerWrapper(reutersPoller),
apPoller: pollerWrapper(apPoller),
} satisfies Record<
PollerId,
(sqsEvent: HandlerInputSqsPayload) => Promise<void>
>;