in typescript/src/link/link.ts [17:74]
async function enqueueUnstoredPurchaseToken(
subChecks: SubscriptionCheckData[],
): Promise<number> {
// There's nothing to do if there are no subs to check - this will always be
// the case for Feast.
if (subChecks.length === 0) {
return 0;
}
const dynamoResult = dynamoMapper.batchGet(
subChecks.map((sub) =>
new SubscriptionEmpty().setSubscriptionId(sub.subscriptionId),
),
);
type IndexedSubscriptionCheckData = Record<string, SubscriptionCheckData>;
const indexedReferences: IndexedSubscriptionCheckData =
subChecks.reduce<IndexedSubscriptionCheckData>((agg, value) => {
agg[value.subscriptionId] = value;
return agg;
}, {});
// eliminate all known subscriptions
for await (const result of dynamoResult) {
delete indexedReferences[result.subscriptionId];
}
const refsToSend = Object.values(indexedReferences).map(
(value) => value.subscriptionReference,
);
if (refsToSend.length > 0) {
const queueUrl = process.env.QueueUrl;
if (queueUrl === undefined) {
throw new Error('No QueueUrl env parameter provided');
}
const sqsMessages: SendMessageBatchRequestEntry[] = refsToSend.map(
(subRef, index) => ({
Id: index.toString(),
MessageBody: JSON.stringify(subRef),
}),
);
const result = await sqs
.sendMessageBatch({ QueueUrl: queueUrl, Entries: sqsMessages })
.promise();
if (result.Failed && result.Failed.length > 0) {
throw new ProcessingError(
'Unable to send all the subscription reference to SQS, will retry',
true,
);
}
return result.Successful.length;
} else {
return 0;
}
}