void Repo::RerunTransactionQueue()

in database/src/desktop/core/repo.cc [976:1111]


void Repo::RerunTransactionQueue(const std::vector<TransactionDataPtr>& queue,
                                 const Path& path) {
  logger_->LogDebug("RerunTransactionQueue @ %s (# of transaction : %d)",
                    path.c_str(), static_cast<int>(queue.size()));

  if (queue.empty()) {
    // Nothing to do!
    return;
  }

  struct FutureToComplete {
    FutureToComplete(TransactionDataPtr transaction, Error abort_reason,
                     Variant node)
        : transaction(transaction), abort_reason(abort_reason), node(node) {}
    TransactionDataPtr transaction;
    Error abort_reason;
    Variant node;
  };
  std::vector<FutureToComplete> futures_to_complete;

  std::vector<WriteId> sets_to_ignore;
  sets_to_ignore.reserve(queue.size());
  for (const TransactionDataPtr& transaction : queue) {
    sets_to_ignore.push_back(transaction->current_write_id);
  }

  for (const TransactionDataPtr& transaction : queue) {
    Optional<Path> relative_path = Path::GetRelative(path, transaction->path);
    assert(relative_path.has_value());

    bool abort_transaction = false;
    Error abort_reason = kErrorNone;
    std::vector<Event> events;

    if (transaction->status == TransactionData::kStatusNeedsAbort) {
      abort_transaction = true;
      abort_reason = transaction->abort_reason;
      if (abort_reason != kErrorWriteCanceled) {
        Extend(&events, server_sync_tree_->AckUserWrite(
                            transaction->current_write_id, kAckRevert,
                            kDoNotPersist, server_time_offset_));
      }
    } else if (transaction->status == TransactionData::kStatusRun) {
      if (transaction->retry_count >= TransactionData::kTransactionMaxRetries) {
        abort_transaction = true;
        abort_reason = kErrorMaxRetries;
        Extend(&events, server_sync_tree_->AckUserWrite(
                            transaction->current_write_id, kAckRevert,
                            kDoNotPersist, server_time_offset_));
      } else {
        // This code rerun a transaction
        Variant current_input =
            GetLatestState(transaction->path, sets_to_ignore);
        // TODO(chkuang): Make sure the local cache does not contain vector.
        //                Gently convert everything for now.
        if (HasVector(current_input)) {
          ConvertVectorToMap(&current_input);
        }

        transaction->current_input_snapshot = current_input;

        MutableDataInternal* mutable_data_impl =
            new MutableDataInternal(database_, current_input);
        MutableData mutable_data(mutable_data_impl);
        Error error = kErrorNone;
        TransactionResult result = transaction->transaction_function(
            &mutable_data, transaction->context);
        if (result == kTransactionResultSuccess) {
          WriteId old_write_id = transaction->current_write_id;

          Variant server_values = GenerateServerValues(server_time_offset_);
          Variant* new_data_node = mutable_data_impl->GetNode();
          Variant new_node_resolved =
              ResolveDeferredValueSnapshot(*new_data_node, server_values);

          transaction->current_output_snapshot_raw = *new_data_node;
          transaction->current_output_snapshot_resolved = new_node_resolved;
          transaction->current_write_id = GetNextWriteId();

          sets_to_ignore.push_back(old_write_id);
          Extend(&events,
                 server_sync_tree_->ApplyUserOverwrite(
                     transaction->path, *new_data_node, new_node_resolved,
                     transaction->current_write_id,
                     transaction->trigger_local_events ? kOverwriteVisible
                                                       : kOverwriteInvisible,
                     kPersist));
          Extend(&events, server_sync_tree_->AckUserWrite(
                              old_write_id, kAckRevert, kDoNotPersist,
                              server_time_offset_));
        } else {
          abort_transaction = true;
          abort_reason = error;
          Extend(&events, server_sync_tree_->AckUserWrite(
                              transaction->current_write_id, kAckRevert,
                              kDoNotPersist, server_time_offset_));
        }
      }
    }

    PostEvents(events);

    if (abort_transaction) {
      transaction->status = TransactionData::kStatusComplete;
      DatabaseReferenceInternal* database_ref_impl =
          new DatabaseReferenceInternal(database_, path);
      DatabaseReference ref(database_ref_impl);

      futures_to_complete.push_back(FutureToComplete(
          transaction, abort_reason, transaction->current_input_snapshot));

      // Removing a callback can trigger pruning which can muck with
      // merged_data/visible_data (as it prunes data). So defer removing the
      // callback until later.
      Repo::scheduler().Schedule(NewCallback(
          [](Repo* repo, TransactionDataPtr transaction) {
            repo->RemoveEventCallback(transaction->outstanding_listener.get(),
                                      QuerySpec(transaction->path));
          },
          this, transaction));
    }
  }
  PruneCompletedTransactions(&transaction_queue_tree_);

  for (auto& future_to_complete : futures_to_complete) {
    TransactionDataPtr& transaction = future_to_complete.transaction;
    Error& abort_reason = future_to_complete.abort_reason;
    Variant& node = future_to_complete.node;
    DataSnapshot snapshot(new DataSnapshotInternal(
        database_, node, QuerySpec(transaction->path)));
    transaction->ref_future->CompleteWithResult(transaction->future_handle,
                                                abort_reason, snapshot);
  }

  SendAllReadyTransactions();
}