async function enqueueUnstoredPurchaseToken()

in typescript/src/link/link.ts [17:74]


async function enqueueUnstoredPurchaseToken(
  subChecks: SubscriptionCheckData[],
): Promise<number> {
  // There's nothing to do if there are no subs to check - this will always be
  // the case for Feast.
  if (subChecks.length === 0) {
    return 0;
  }

  const dynamoResult = dynamoMapper.batchGet(
    subChecks.map((sub) =>
      new SubscriptionEmpty().setSubscriptionId(sub.subscriptionId),
    ),
  );

  type IndexedSubscriptionCheckData = Record<string, SubscriptionCheckData>;
  const indexedReferences: IndexedSubscriptionCheckData =
    subChecks.reduce<IndexedSubscriptionCheckData>((agg, value) => {
      agg[value.subscriptionId] = value;
      return agg;
    }, {});

  // eliminate all known subscriptions
  for await (const result of dynamoResult) {
    delete indexedReferences[result.subscriptionId];
  }

  const refsToSend = Object.values(indexedReferences).map(
    (value) => value.subscriptionReference,
  );

  if (refsToSend.length > 0) {
    const queueUrl = process.env.QueueUrl;
    if (queueUrl === undefined) {
      throw new Error('No QueueUrl env parameter provided');
    }

    const sqsMessages: SendMessageBatchRequestEntry[] = refsToSend.map(
      (subRef, index) => ({
        Id: index.toString(),
        MessageBody: JSON.stringify(subRef),
      }),
    );

    const result = await sqs
      .sendMessageBatch({ QueueUrl: queueUrl, Entries: sqsMessages })
      .promise();
    if (result.Failed && result.Failed.length > 0) {
      throw new ProcessingError(
        'Unable to send all the subscription reference to SQS, will retry',
        true,
      );
    }
    return result.Successful.length;
  } else {
    return 0;
  }
}