in packages/database/src/core/Repo.ts [1045:1150]
function repoSendTransactionQueue(
repo: Repo,
path: Path,
queue: Transaction[]
): void {
// Mark transactions as sent and increment retry count!
const setsToIgnore = queue.map(txn => {
return txn.currentWriteId;
});
const latestState = repoGetLatestState(repo, path, setsToIgnore);
let snapToSend = latestState;
const latestHash = latestState.hash();
for (let i = 0; i < queue.length; i++) {
const txn = queue[i];
assert(
txn.status === TransactionStatus.RUN,
'tryToSendTransactionQueue_: items in queue should all be run.'
);
txn.status = TransactionStatus.SENT;
txn.retryCount++;
const relativePath = newRelativePath(path, txn.path);
// If we've gotten to this point, the output snapshot must be defined.
snapToSend = snapToSend.updateChild(
relativePath /** @type {!Node} */,
txn.currentOutputSnapshotRaw
);
}
const dataToSend = snapToSend.val(true);
const pathToSend = path;
// Send the put.
repo.server_.put(
pathToSend.toString(),
dataToSend,
(status: string) => {
repoLog(repo, 'transaction put response', {
path: pathToSend.toString(),
status
});
let events: Event[] = [];
if (status === 'ok') {
// Queue up the callbacks and fire them after cleaning up all of our
// transaction state, since the callback could trigger more
// transactions or sets.
const callbacks = [];
for (let i = 0; i < queue.length; i++) {
queue[i].status = TransactionStatus.COMPLETED;
events = events.concat(
syncTreeAckUserWrite(repo.serverSyncTree_, queue[i].currentWriteId)
);
if (queue[i].onComplete) {
// We never unset the output snapshot, and given that this
// transaction is complete, it should be set
callbacks.push(() =>
queue[i].onComplete(
null,
true,
queue[i].currentOutputSnapshotResolved
)
);
}
queue[i].unwatcher();
}
// Now remove the completed transactions.
repoPruneCompletedTransactionsBelowNode(
repo,
treeSubTree(repo.transactionQueueTree_, path)
);
// There may be pending transactions that we can now send.
repoSendReadyTransactions(repo, repo.transactionQueueTree_);
eventQueueRaiseEventsForChangedPath(repo.eventQueue_, path, events);
// Finally, trigger onComplete callbacks.
for (let i = 0; i < callbacks.length; i++) {
exceptionGuard(callbacks[i]);
}
} else {
// transactions are no longer sent. Update their status appropriately.
if (status === 'datastale') {
for (let i = 0; i < queue.length; i++) {
if (queue[i].status === TransactionStatus.SENT_NEEDS_ABORT) {
queue[i].status = TransactionStatus.NEEDS_ABORT;
} else {
queue[i].status = TransactionStatus.RUN;
}
}
} else {
warn(
'transaction at ' + pathToSend.toString() + ' failed: ' + status
);
for (let i = 0; i < queue.length; i++) {
queue[i].status = TransactionStatus.NEEDS_ABORT;
queue[i].abortReason = status;
}
}
repoRerunTransactions(repo, path);
}
},
latestHash
);
}