in database/src/desktop/core/repo.cc [976:1111]
void Repo::RerunTransactionQueue(const std::vector<TransactionDataPtr>& queue,
const Path& path) {
logger_->LogDebug("RerunTransactionQueue @ %s (# of transaction : %d)",
path.c_str(), static_cast<int>(queue.size()));
if (queue.empty()) {
// Nothing to do!
return;
}
struct FutureToComplete {
FutureToComplete(TransactionDataPtr transaction, Error abort_reason,
Variant node)
: transaction(transaction), abort_reason(abort_reason), node(node) {}
TransactionDataPtr transaction;
Error abort_reason;
Variant node;
};
std::vector<FutureToComplete> futures_to_complete;
std::vector<WriteId> sets_to_ignore;
sets_to_ignore.reserve(queue.size());
for (const TransactionDataPtr& transaction : queue) {
sets_to_ignore.push_back(transaction->current_write_id);
}
for (const TransactionDataPtr& transaction : queue) {
Optional<Path> relative_path = Path::GetRelative(path, transaction->path);
assert(relative_path.has_value());
bool abort_transaction = false;
Error abort_reason = kErrorNone;
std::vector<Event> events;
if (transaction->status == TransactionData::kStatusNeedsAbort) {
abort_transaction = true;
abort_reason = transaction->abort_reason;
if (abort_reason != kErrorWriteCanceled) {
Extend(&events, server_sync_tree_->AckUserWrite(
transaction->current_write_id, kAckRevert,
kDoNotPersist, server_time_offset_));
}
} else if (transaction->status == TransactionData::kStatusRun) {
if (transaction->retry_count >= TransactionData::kTransactionMaxRetries) {
abort_transaction = true;
abort_reason = kErrorMaxRetries;
Extend(&events, server_sync_tree_->AckUserWrite(
transaction->current_write_id, kAckRevert,
kDoNotPersist, server_time_offset_));
} else {
// This code rerun a transaction
Variant current_input =
GetLatestState(transaction->path, sets_to_ignore);
// TODO(chkuang): Make sure the local cache does not contain vector.
// Gently convert everything for now.
if (HasVector(current_input)) {
ConvertVectorToMap(¤t_input);
}
transaction->current_input_snapshot = current_input;
MutableDataInternal* mutable_data_impl =
new MutableDataInternal(database_, current_input);
MutableData mutable_data(mutable_data_impl);
Error error = kErrorNone;
TransactionResult result = transaction->transaction_function(
&mutable_data, transaction->context);
if (result == kTransactionResultSuccess) {
WriteId old_write_id = transaction->current_write_id;
Variant server_values = GenerateServerValues(server_time_offset_);
Variant* new_data_node = mutable_data_impl->GetNode();
Variant new_node_resolved =
ResolveDeferredValueSnapshot(*new_data_node, server_values);
transaction->current_output_snapshot_raw = *new_data_node;
transaction->current_output_snapshot_resolved = new_node_resolved;
transaction->current_write_id = GetNextWriteId();
sets_to_ignore.push_back(old_write_id);
Extend(&events,
server_sync_tree_->ApplyUserOverwrite(
transaction->path, *new_data_node, new_node_resolved,
transaction->current_write_id,
transaction->trigger_local_events ? kOverwriteVisible
: kOverwriteInvisible,
kPersist));
Extend(&events, server_sync_tree_->AckUserWrite(
old_write_id, kAckRevert, kDoNotPersist,
server_time_offset_));
} else {
abort_transaction = true;
abort_reason = error;
Extend(&events, server_sync_tree_->AckUserWrite(
transaction->current_write_id, kAckRevert,
kDoNotPersist, server_time_offset_));
}
}
}
PostEvents(events);
if (abort_transaction) {
transaction->status = TransactionData::kStatusComplete;
DatabaseReferenceInternal* database_ref_impl =
new DatabaseReferenceInternal(database_, path);
DatabaseReference ref(database_ref_impl);
futures_to_complete.push_back(FutureToComplete(
transaction, abort_reason, transaction->current_input_snapshot));
// Removing a callback can trigger pruning which can muck with
// merged_data/visible_data (as it prunes data). So defer removing the
// callback until later.
Repo::scheduler().Schedule(NewCallback(
[](Repo* repo, TransactionDataPtr transaction) {
repo->RemoveEventCallback(transaction->outstanding_listener.get(),
QuerySpec(transaction->path));
},
this, transaction));
}
}
PruneCompletedTransactions(&transaction_queue_tree_);
for (auto& future_to_complete : futures_to_complete) {
TransactionDataPtr& transaction = future_to_complete.transaction;
Error& abort_reason = future_to_complete.abort_reason;
Variant& node = future_to_complete.node;
DataSnapshot snapshot(new DataSnapshotInternal(
database_, node, QuerySpec(transaction->path)));
transaction->ref_future->CompleteWithResult(transaction->future_handle,
abort_reason, snapshot);
}
SendAllReadyTransactions();
}