in backend/transaction/transaction_store.cc [215:312]
absl::Status TransactionStore::Read(
const Table* table, const KeyRange& key_range,
absl::Span<const Column* const> columns,
std::unique_ptr<StorageIterator>* storage_itr,
bool allow_pending_commit_timestamps_in_read) const {
// Acquire locks to prevent another transaction to modify this entity.
ZETASQL_RETURN_IF_ERROR(AcquireReadLock(table, key_range, columns));
// Read rows buffered within transaction store.
// Table lookup.
std::vector<FixedRowStorageIterator::Row> rows;
auto table_itr = buffered_ops_.find(table);
if (table_itr != buffered_ops_.end()) {
auto table = table_itr->second;
// Key range lookup.
auto begin_itr = table.lower_bound(key_range.start_key());
auto end_itr = table.lower_bound(key_range.limit_key());
for (auto itr = begin_itr; itr != end_itr; ++itr) {
if (itr->second.first == OpType::kInsert) {
// Add inserts into the StorageIterator.
const Row& row_values = itr->second.second;
ValueList values;
values.reserve(columns.size());
for (const Column* column : columns) {
if (row_values.find(column) == row_values.end()) {
values.emplace_back(zetasql::values::Null(column->GetType()));
} else {
values.emplace_back(row_values.at(column));
}
}
rows.emplace_back(itr->first, std::move(values));
}
}
}
auto buffered_rows_count = rows.size();
// Read from the base storage and apply the changes buffered in transaction
// store.
std::unique_ptr<StorageIterator> base_itr;
ZETASQL_RETURN_IF_ERROR(base_storage_->Read(absl::InfiniteFuture(), table->id(),
key_range, GetColumnIDs(columns),
&base_itr));
while (base_itr->Next()) {
ValueList values;
values.reserve(columns.size());
RowOp row_op;
// Merge column values from transaction store & base storage.
if (RowExistsInBuffer(table, base_itr->Key(), &row_op)) {
if (row_op.first == OpType::kDelete || row_op.first == OpType::kInsert) {
// Omit the deletes from the output. The buffered inserts have already
// been added to the output.
continue;
}
if (row_op.first == OpType::kUpdate) {
// Update the column values to reflect the changes in transaction store.
for (int i = 0; i < columns.size(); i++) {
if (row_op.second.find(columns[i]) != row_op.second.end()) {
values.emplace_back(row_op.second[columns[i]]);
} else if (base_itr->ColumnValue(i).is_valid()) {
values.emplace_back(base_itr->ColumnValue(i));
} else {
values.emplace_back(zetasql::values::Null(columns[i]->GetType()));
}
}
}
} else {
// Copy the base storage column values since this row does not exists in
// transaction store.
for (int i = 0; i < columns.size(); i++) {
if (base_itr->ColumnValue(i).is_valid()) {
values.emplace_back(base_itr->ColumnValue(i));
} else {
values.emplace_back(zetasql::values::Null(columns[i]->GetType()));
}
}
}
rows.emplace_back(base_itr->Key(), std::move(values));
}
// Pending commit timestamp values in buffer cannot be returned to
// clients.
if (!allow_pending_commit_timestamps_in_read) {
ZETASQL_RETURN_IF_ERROR(commit_timestamp_tracker_->CheckRead(table, columns));
}
// The keys need to be sorted to provide iterating in order. The rows added
// from buffered ops and the ones added from base storage were already in
// sorted order respectively. Merge these two sorted subarrays to get overall
// sorted order.
ABSL_DCHECK(buffered_rows_count <= rows.size())
<< buffered_rows_count << " vs " << rows.size();
auto first_base_row_iter = rows.begin() + buffered_rows_count;
std::inplace_merge(rows.begin(), first_base_row_iter, rows.end(), SortByKey);
*storage_itr = std::make_unique<FixedRowStorageIterator>(std::move(rows));
return absl::OkStatus();
}