function repoSendTransactionQueue()

in packages/database/src/core/Repo.ts [1045:1150]


function repoSendTransactionQueue(
  repo: Repo,
  path: Path,
  queue: Transaction[]
): void {
  // Mark transactions as sent and increment retry count!
  const setsToIgnore = queue.map(txn => {
    return txn.currentWriteId;
  });
  const latestState = repoGetLatestState(repo, path, setsToIgnore);
  let snapToSend = latestState;
  const latestHash = latestState.hash();
  for (let i = 0; i < queue.length; i++) {
    const txn = queue[i];
    assert(
      txn.status === TransactionStatus.RUN,
      'tryToSendTransactionQueue_: items in queue should all be run.'
    );
    txn.status = TransactionStatus.SENT;
    txn.retryCount++;
    const relativePath = newRelativePath(path, txn.path);
    // If we've gotten to this point, the output snapshot must be defined.
    snapToSend = snapToSend.updateChild(
      relativePath /** @type {!Node} */,
      txn.currentOutputSnapshotRaw
    );
  }

  const dataToSend = snapToSend.val(true);
  const pathToSend = path;

  // Send the put.
  repo.server_.put(
    pathToSend.toString(),
    dataToSend,
    (status: string) => {
      repoLog(repo, 'transaction put response', {
        path: pathToSend.toString(),
        status
      });

      let events: Event[] = [];
      if (status === 'ok') {
        // Queue up the callbacks and fire them after cleaning up all of our
        // transaction state, since the callback could trigger more
        // transactions or sets.
        const callbacks = [];
        for (let i = 0; i < queue.length; i++) {
          queue[i].status = TransactionStatus.COMPLETED;
          events = events.concat(
            syncTreeAckUserWrite(repo.serverSyncTree_, queue[i].currentWriteId)
          );
          if (queue[i].onComplete) {
            // We never unset the output snapshot, and given that this
            // transaction is complete, it should be set
            callbacks.push(() =>
              queue[i].onComplete(
                null,
                true,
                queue[i].currentOutputSnapshotResolved
              )
            );
          }
          queue[i].unwatcher();
        }

        // Now remove the completed transactions.
        repoPruneCompletedTransactionsBelowNode(
          repo,
          treeSubTree(repo.transactionQueueTree_, path)
        );
        // There may be pending transactions that we can now send.
        repoSendReadyTransactions(repo, repo.transactionQueueTree_);

        eventQueueRaiseEventsForChangedPath(repo.eventQueue_, path, events);

        // Finally, trigger onComplete callbacks.
        for (let i = 0; i < callbacks.length; i++) {
          exceptionGuard(callbacks[i]);
        }
      } else {
        // transactions are no longer sent.  Update their status appropriately.
        if (status === 'datastale') {
          for (let i = 0; i < queue.length; i++) {
            if (queue[i].status === TransactionStatus.SENT_NEEDS_ABORT) {
              queue[i].status = TransactionStatus.NEEDS_ABORT;
            } else {
              queue[i].status = TransactionStatus.RUN;
            }
          }
        } else {
          warn(
            'transaction at ' + pathToSend.toString() + ' failed: ' + status
          );
          for (let i = 0; i < queue.length; i++) {
            queue[i].status = TransactionStatus.NEEDS_ABORT;
            queue[i].abortReason = status;
          }
        }

        repoRerunTransactions(repo, path);
      }
    },
    latestHash
  );
}