in src/main/java/com/google/firebase/database/core/Repo.java [936:1074]
private void rerunTransactionQueue(List<TransactionData> queue, Path path) {
if (queue.isEmpty()) {
return; // Nothing to do!
}
// Queue up the callbacks and fire them after cleaning up all of our transaction state, since
// the callback could trigger more transactions or sets
List<Runnable> callbacks = new ArrayList<>();
// Ignore, by default, all of the sets in this queue, since we're re-running all of them.
// However, we want to include the results of new sets triggered as part of this re-run, so we
// don't want to ignore a range, just these specific sets.
List<Long> setsToIgnore = new ArrayList<>();
for (TransactionData transaction : queue) {
setsToIgnore.add(transaction.currentWriteId);
}
for (final TransactionData transaction : queue) {
Path relativePath = Path.getRelative(path, transaction.path);
boolean abortTransaction = false;
DatabaseError abortReason = null;
List<Event> events = new ArrayList<>();
assert relativePath != null; // rerunTransactionQueue: relativePath should not be null.
if (transaction.status == TransactionStatus.NEEDS_ABORT) {
abortTransaction = true;
abortReason = transaction.abortReason;
if (abortReason.getCode() != DatabaseError.WRITE_CANCELED) {
events.addAll(
serverSyncTree.ackUserWrite(
transaction.currentWriteId, /*revert=*/ true, /*persist=*/ false, serverClock));
}
} else if (transaction.status == TransactionStatus.RUN) {
if (transaction.retryCount >= TRANSACTION_MAX_RETRIES) {
abortTransaction = true;
abortReason = DatabaseError.fromStatus(TRANSACTION_TOO_MANY_RETRIES);
events.addAll(
serverSyncTree.ackUserWrite(
transaction.currentWriteId, /*revert=*/ true, /*persist=*/ false, serverClock));
} else {
// This code reruns a transaction
Node currentNode = this.getLatestState(transaction.path, setsToIgnore);
transaction.currentInputSnapshot = currentNode;
MutableData mutableCurrent = InternalHelpers.createMutableData(currentNode);
DatabaseError error = null;
Transaction.Result result;
try {
result = transaction.handler.doTransaction(mutableCurrent);
} catch (Throwable e) {
error = DatabaseError.fromException(e);
result = Transaction.abort();
}
if (result.isSuccess()) {
final Long oldWriteId = transaction.currentWriteId;
Map<String, Object> serverValues = ServerValues.generateServerValues(serverClock);
Node newDataNode = result.getNode();
Node newNodeResolved =
ServerValues.resolveDeferredValueSnapshot(newDataNode, serverValues);
transaction.currentOutputSnapshotRaw = newDataNode;
transaction.currentOutputSnapshotResolved = newNodeResolved;
transaction.currentWriteId = this.getNextWriteId();
// Mutates setsToIgnore in place
setsToIgnore.remove(oldWriteId);
events.addAll(
serverSyncTree.applyUserOverwrite(
transaction.path,
newDataNode,
newNodeResolved,
transaction.currentWriteId,
transaction.applyLocally, /*persist=*/
false));
events.addAll(
serverSyncTree.ackUserWrite(
oldWriteId, /*revert=*/ true, /*persist=*/ false, serverClock));
} else {
// The user aborted the transaction. It's not an error, so we don't need to send them
// one
abortTransaction = true;
abortReason = error;
events.addAll(
serverSyncTree.ackUserWrite(
transaction.currentWriteId, /*revert=*/ true, /*persist=*/ false, serverClock));
}
}
}
this.postEvents(events);
if (abortTransaction) {
// Abort
transaction.status = TransactionStatus.COMPLETED;
final DatabaseReference ref = InternalHelpers.createReference(this, transaction.path);
// We set this field immediately, so it's safe to cast to an actual snapshot
Node lastInput = transaction.currentInputSnapshot;
// TODO: In the future, perhaps this should just be KeyIndex?
final DataSnapshot snapshot =
InternalHelpers.createDataSnapshot(ref, IndexedNode.from(lastInput));
// Removing a callback can trigger pruning which can muck with mergedData/visibleData (as it
// prunes data). So defer removing the callback until later.
this.scheduleNow(
new Runnable() {
@Override
public void run() {
removeEventCallback(
new ValueEventRegistration(
Repo.this,
transaction.outstandingListener,
QuerySpec.defaultQueryAtPath(transaction.path)));
}
});
final DatabaseError callbackError = abortReason;
callbacks.add(
new Runnable() {
@Override
public void run() {
runTransactionOnComplete(transaction.handler, callbackError, false, snapshot);
}
});
}
}
// Clean up completed transactions.
pruneCompletedTransactions(transactionQueueTree);
// Now fire callbacks, now that we're in a good, known state.
for (Runnable callback : callbacks) {
postEvent(callback);
}
// Try to send the transaction result to the server.
sendAllReadyTransactions();
}