in packages/database/src/core/Repo.ts [1183:1335]
function repoRerunTransactionQueue(
repo: Repo,
queue: Transaction[],
path: Path
): void {
if (queue.length === 0) {
return; // Nothing to do!
}
// Queue up the callbacks and fire them after cleaning up all of our
// transaction state, since the callback could trigger more transactions or
// sets.
const callbacks = [];
let events: Event[] = [];
// Ignore all of the sets we're going to re-run.
const txnsToRerun = queue.filter(q => {
return q.status === TransactionStatus.RUN;
});
const setsToIgnore = txnsToRerun.map(q => {
return q.currentWriteId;
});
for (let i = 0; i < queue.length; i++) {
const transaction = queue[i];
const relativePath = newRelativePath(path, transaction.path);
let abortTransaction = false,
abortReason;
assert(
relativePath !== null,
'rerunTransactionsUnderNode_: relativePath should not be null.'
);
if (transaction.status === TransactionStatus.NEEDS_ABORT) {
abortTransaction = true;
abortReason = transaction.abortReason;
events = events.concat(
syncTreeAckUserWrite(
repo.serverSyncTree_,
transaction.currentWriteId,
true
)
);
} else if (transaction.status === TransactionStatus.RUN) {
if (transaction.retryCount >= MAX_TRANSACTION_RETRIES) {
abortTransaction = true;
abortReason = 'maxretry';
events = events.concat(
syncTreeAckUserWrite(
repo.serverSyncTree_,
transaction.currentWriteId,
true
)
);
} else {
// This code reruns a transaction
const currentNode = repoGetLatestState(
repo,
transaction.path,
setsToIgnore
);
transaction.currentInputSnapshot = currentNode;
const newData = queue[i].update(currentNode.val());
if (newData !== undefined) {
validateFirebaseData(
'transaction failed: Data returned ',
newData,
transaction.path
);
let newDataNode = nodeFromJSON(newData);
const hasExplicitPriority =
typeof newData === 'object' &&
newData != null &&
contains(newData, '.priority');
if (!hasExplicitPriority) {
// Keep the old priority if there wasn't a priority explicitly specified.
newDataNode = newDataNode.updatePriority(currentNode.getPriority());
}
const oldWriteId = transaction.currentWriteId;
const serverValues = repoGenerateServerValues(repo);
const newNodeResolved = resolveDeferredValueSnapshot(
newDataNode,
currentNode,
serverValues
);
transaction.currentOutputSnapshotRaw = newDataNode;
transaction.currentOutputSnapshotResolved = newNodeResolved;
transaction.currentWriteId = repoGetNextWriteId(repo);
// Mutates setsToIgnore in place
setsToIgnore.splice(setsToIgnore.indexOf(oldWriteId), 1);
events = events.concat(
syncTreeApplyUserOverwrite(
repo.serverSyncTree_,
transaction.path,
newNodeResolved,
transaction.currentWriteId,
transaction.applyLocally
)
);
events = events.concat(
syncTreeAckUserWrite(repo.serverSyncTree_, oldWriteId, true)
);
} else {
abortTransaction = true;
abortReason = 'nodata';
events = events.concat(
syncTreeAckUserWrite(
repo.serverSyncTree_,
transaction.currentWriteId,
true
)
);
}
}
}
eventQueueRaiseEventsForChangedPath(repo.eventQueue_, path, events);
events = [];
if (abortTransaction) {
// Abort.
queue[i].status = TransactionStatus.COMPLETED;
// Removing a listener can trigger pruning which can muck with
// mergedData/visibleData (as it prunes data). So defer the unwatcher
// until we're done.
(function (unwatcher) {
setTimeout(unwatcher, Math.floor(0));
})(queue[i].unwatcher);
if (queue[i].onComplete) {
if (abortReason === 'nodata') {
callbacks.push(() =>
queue[i].onComplete(null, false, queue[i].currentInputSnapshot)
);
} else {
callbacks.push(() =>
queue[i].onComplete(new Error(abortReason), false, null)
);
}
}
}
}
// Clean up completed transactions.
repoPruneCompletedTransactionsBelowNode(repo, repo.transactionQueueTree_);
// Now fire callbacks, now that we're in a good, known state.
for (let i = 0; i < callbacks.length; i++) {
exceptionGuard(callbacks[i]);
}
// Try to send the transaction result to the server.
repoSendReadyTransactions(repo, repo.transactionQueueTree_);
}