absl::Status TransactionStore::Read()

in backend/transaction/transaction_store.cc [215:312]


absl::Status TransactionStore::Read(
    const Table* table, const KeyRange& key_range,
    absl::Span<const Column* const> columns,
    std::unique_ptr<StorageIterator>* storage_itr,
    bool allow_pending_commit_timestamps_in_read) const {
  // Acquire locks to prevent another transaction to modify this entity.
  ZETASQL_RETURN_IF_ERROR(AcquireReadLock(table, key_range, columns));

  // Read rows buffered within transaction store.
  // Table lookup.
  std::vector<FixedRowStorageIterator::Row> rows;
  auto table_itr = buffered_ops_.find(table);
  if (table_itr != buffered_ops_.end()) {
    auto table = table_itr->second;
    // Key range lookup.
    auto begin_itr = table.lower_bound(key_range.start_key());
    auto end_itr = table.lower_bound(key_range.limit_key());

    for (auto itr = begin_itr; itr != end_itr; ++itr) {
      if (itr->second.first == OpType::kInsert) {
        // Add inserts into the StorageIterator.
        const Row& row_values = itr->second.second;
        ValueList values;
        values.reserve(columns.size());
        for (const Column* column : columns) {
          if (row_values.find(column) == row_values.end()) {
            values.emplace_back(zetasql::values::Null(column->GetType()));
          } else {
            values.emplace_back(row_values.at(column));
          }
        }
        rows.emplace_back(itr->first, std::move(values));
      }
    }
  }
  auto buffered_rows_count = rows.size();

  // Read from the base storage and apply the changes buffered in transaction
  // store.
  std::unique_ptr<StorageIterator> base_itr;
  ZETASQL_RETURN_IF_ERROR(base_storage_->Read(absl::InfiniteFuture(), table->id(),
                                      key_range, GetColumnIDs(columns),
                                      &base_itr));
  while (base_itr->Next()) {
    ValueList values;
    values.reserve(columns.size());

    RowOp row_op;
    // Merge column values from transaction store & base storage.
    if (RowExistsInBuffer(table, base_itr->Key(), &row_op)) {
      if (row_op.first == OpType::kDelete || row_op.first == OpType::kInsert) {
        // Omit the deletes from the output. The buffered inserts have already
        // been added to the output.
        continue;
      }
      if (row_op.first == OpType::kUpdate) {
        // Update the column values to reflect the changes in transaction store.
        for (int i = 0; i < columns.size(); i++) {
          if (row_op.second.find(columns[i]) != row_op.second.end()) {
            values.emplace_back(row_op.second[columns[i]]);
          } else if (base_itr->ColumnValue(i).is_valid()) {
            values.emplace_back(base_itr->ColumnValue(i));
          } else {
            values.emplace_back(zetasql::values::Null(columns[i]->GetType()));
          }
        }
      }
    } else {
      // Copy the base storage column values since this row does not exists in
      // transaction store.
      for (int i = 0; i < columns.size(); i++) {
        if (base_itr->ColumnValue(i).is_valid()) {
          values.emplace_back(base_itr->ColumnValue(i));
        } else {
          values.emplace_back(zetasql::values::Null(columns[i]->GetType()));
        }
      }
    }
    rows.emplace_back(base_itr->Key(), std::move(values));
  }

  // Pending commit timestamp values in buffer cannot be returned to
  // clients.
  if (!allow_pending_commit_timestamps_in_read) {
    ZETASQL_RETURN_IF_ERROR(commit_timestamp_tracker_->CheckRead(table, columns));
  }

  // The keys need to be sorted to provide iterating in order. The rows added
  // from buffered ops and the ones added from base storage were already in
  // sorted order respectively. Merge these two sorted subarrays to get overall
  // sorted order.
  ABSL_DCHECK(buffered_rows_count <= rows.size())
      << buffered_rows_count << " vs " << rows.size();
  auto first_base_row_iter = rows.begin() + buffered_rows_count;
  std::inplace_merge(rows.begin(), first_base_row_iter, rows.end(), SortByKey);
  *storage_itr = std::make_unique<FixedRowStorageIterator>(std::move(rows));
  return absl::OkStatus();
}