in src/main/java/com/google/firebase/database/core/Repo.java [787:892]
private void sendTransactionQueue(final List<TransactionData> queue, final Path path) {
// Mark transactions as sent and increment retry count!
List<Long> setsToIgnore = new ArrayList<>();
for (TransactionData txn : queue) {
setsToIgnore.add(txn.currentWriteId);
}
Node latestState = this.getLatestState(path, setsToIgnore);
Node snapToSend = latestState;
String latestHash = "badhash";
if (!hijackHash) {
latestHash = latestState.getHash();
}
for (TransactionData txn : queue) {
assert txn.status
== TransactionStatus.RUN; // sendTransactionQueue: items in queue should all be run.'
txn.status = TransactionStatus.SENT;
txn.retryCount++;
Path relativePath = Path.getRelative(path, txn.path);
// If we've gotten to this point, the output snapshot must be defined.
snapToSend = snapToSend.updateChild(relativePath, txn.currentOutputSnapshotRaw);
}
Object dataToSend = snapToSend.getValue(true);
final Repo repo = this;
// Send the put.
connection.compareAndPut(
path.asList(),
dataToSend,
latestHash,
new RequestResultCallback() {
@Override
public void onRequestResult(String optErrorCode, String optErrorMessage) {
DatabaseError error = fromErrorCode(optErrorCode, optErrorMessage);
warnIfWriteFailed("Transaction", path, error);
List<Event> events = new ArrayList<>();
if (error == null) {
List<Runnable> callbacks = new ArrayList<>();
for (final TransactionData txn : queue) {
txn.status = TransactionStatus.COMPLETED;
events.addAll(
serverSyncTree.ackUserWrite(
txn.currentWriteId, /*revert=*/ false, /*persist=*/ false, serverClock));
// We never unset the output snapshot, and given that this
// transaction is complete, it should be set
Node node = txn.currentOutputSnapshotResolved;
final DataSnapshot snap =
InternalHelpers.createDataSnapshot(
InternalHelpers.createReference(repo, txn.path), IndexedNode.from(node));
callbacks.add(
new Runnable() {
@Override
public void run() {
runTransactionOnComplete(txn.handler, null, true, snap);
}
});
// Remove the outstanding value listener that we added
removeEventCallback(
new ValueEventRegistration(
Repo.this,
txn.outstandingListener,
QuerySpec.defaultQueryAtPath(txn.path)));
}
// Now remove the completed transactions
pruneCompletedTransactions(transactionQueueTree.subTree(path));
// There may be pending transactions that we can now send
sendAllReadyTransactions();
repo.postEvents(events);
// Finally, run the callbacks
for (Runnable callback : callbacks) {
postEvent(callback);
}
} else {
// transactions are no longer sent. Update their status appropriately
if (error.getCode() == DatabaseError.DATA_STALE) {
for (TransactionData transaction : queue) {
if (transaction.status == TransactionStatus.SENT_NEEDS_ABORT) {
transaction.status = TransactionStatus.NEEDS_ABORT;
} else {
transaction.status = TransactionStatus.RUN;
}
}
} else {
for (TransactionData transaction : queue) {
transaction.status = TransactionStatus.NEEDS_ABORT;
transaction.abortReason = error;
}
}
// since we reverted mergedData, we should re-run any remaining
// transactions and raise events
rerunTransactions(path);
}
}
});
}