backend/database/change_stream/change_stream_partition_churner.cc (271 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/database/change_stream/change_stream_partition_churner.h"
#include <memory> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <utility>
#include <vector>
#include "zetasql/public/value.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/flags/flag.h"
#include "absl/log/log.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "backend/access/read.h"
#include "backend/access/write.h"
#include "backend/datamodel/key_set.h"
#include "backend/schema/backfills/change_stream_backfill.h"
#include "backend/schema/catalog/change_stream.h"
#include "backend/schema/catalog/schema.h"
#include "backend/transaction/options.h"
#include "backend/transaction/read_write_transaction.h"
#include "common/change_stream.h"
#include "common/clock.h"
#include "zetasql/base/ret_check.h"
#include "zetasql/base/status_macros.h"
ABSL_FLAG(absl::Duration, change_stream_churning_interval, absl::Seconds(20),
"Change stream churning interval in seconds.");
ABSL_FLAG(absl::Duration, change_stream_churn_thread_sleep_interval,
absl::Seconds(20), "Change stream thread sleep interval.");
ABSL_FLAG(
absl::Duration, change_stream_churn_thread_retry_sleep_interval,
absl::Milliseconds(20),
"How long to sleep when retrying a failed change stream transaction.");
ABSL_FLAG(
int, change_stream_churn_thread_retry_jitter, 100,
"How long to sleep when retrying a failed change stream transaction.");
ABSL_FLAG(bool, enable_change_stream_churning, true,
"Whether to enable change stream churning.");
ABSL_FLAG(
int, override_change_stream_partition_token_alive_seconds, -1,
"If set to X seconds, and it's greater than 0, then override the default "
"partition token alive time from 20-40 seconds to X-2X seconds.");
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
using zetasql::values::String;
using zetasql::values::StringArray;
ChangeStreamPartitionChurner::ChangeStreamPartitionChurner(
CreateReadWriteTransactionFn create_read_write_transaction_fn, Clock* clock)
: create_read_write_transaction_fn_(create_read_write_transaction_fn),
clock_(clock) {
int oerridden_partition_token_alive_seconds =
absl::GetFlag(FLAGS_override_change_stream_partition_token_alive_seconds);
if (oerridden_partition_token_alive_seconds > 0) {
absl::SetFlag(&FLAGS_change_stream_churning_interval,
absl::Seconds(oerridden_partition_token_alive_seconds));
absl::SetFlag(&FLAGS_change_stream_churn_thread_sleep_interval,
absl::Seconds(oerridden_partition_token_alive_seconds));
absl::SetFlag(&FLAGS_change_stream_churn_thread_retry_sleep_interval,
absl::Seconds(oerridden_partition_token_alive_seconds));
}
}
void ChangeStreamPartitionChurner::CreateChurningThread(
absl::string_view change_stream_name) {
mu_.AssertHeld();
if (!absl::GetFlag(FLAGS_enable_change_stream_churning)) {
return;
}
auto churning_thread = std::make_unique<ChurningThread>();
auto thread =
std::thread(&ChangeStreamPartitionChurner::PeriodicChurnPartitions, this,
std::string(change_stream_name), churning_thread.get());
churning_thread->thread = std::move(thread);
churn_threads_.try_emplace(change_stream_name, std::move(churning_thread));
}
void ChangeStreamPartitionChurner::ClearChurningThread(
absl::string_view change_stream_name) {
mu_.AssertHeld();
churn_threads_.erase(change_stream_name);
}
void ChangeStreamPartitionChurner::ClearAllChurningThreads() {
absl::MutexLock l(&mu_);
churn_threads_.clear();
}
void ChangeStreamPartitionChurner::PeriodicChurnPartitions(
absl::string_view change_stream_name, ChurningThread* churning_thread) {
while (true) {
{
absl::MutexLock l(&churning_thread->mu);
churning_thread->mu.AwaitWithTimeout(
absl::Condition(&churning_thread->stop_thread),
absl::GetFlag(FLAGS_change_stream_churn_thread_sleep_interval));
if (churning_thread->stop_thread) {
return;
}
}
absl::Status s;
// In the current state, the emulator only allows one ongoing transaction
// at a time. Thus, churn might fail occasionally due to conflict with
// another ongoing transaction. We should retry the churn in cases of
// failure.
do {
s = ChurnPartitions(change_stream_name);
const auto delay =
absl::GetFlag(FLAGS_change_stream_churn_thread_retry_jitter) *
absl::Uniform<double>(absl::BitGen(), 0, 1);
absl::SleepFor(
absl::GetFlag(FLAGS_change_stream_churn_thread_retry_sleep_interval) +
absl::Milliseconds(delay));
if (!s.ok() && !absl::IsAborted(s)) {
ABSL_LOG(ERROR) << "Failed to churn change stream " << change_stream_name
<< " with status: " << s;
}
} while (!s.ok());
}
}
// TODO: Change stream churn transactions can potentially cause
// user transactions to abort, since there can only be one concurrent
// transaction at a time in the emulator. We need to either update to
// table-level locking, or we need to implement waiting instead of aborting in
// the transaction lock manager.
absl::Status ChangeStreamPartitionChurner::ChurnPartitions(
absl::string_view change_stream_name) {
ZETASQL_ASSIGN_OR_RETURN(auto txn, create_read_write_transaction_fn_(
ReadWriteOptions(), RetryState()));
const Schema* schema = txn->schema();
ZETASQL_RET_CHECK(schema != nullptr);
const ChangeStream* change_stream =
schema->FindChangeStream(std::string(change_stream_name));
if (change_stream == nullptr) {
return absl::OkStatus();
}
// Read the change stream partition table.
backend::ReadArg read_arg;
read_arg.change_stream_for_partition_table = change_stream->Name();
read_arg.columns = {"partition_token", "start_time", "end_time",
"parents", "children", "next_churn"};
read_arg.key_set = KeySet::All();
std::unique_ptr<backend::RowCursor> cursor;
absl::Status status = txn->Read(read_arg, &cursor);
ZETASQL_RETURN_IF_ERROR(status);
absl::flat_hash_map<std::string, std::vector<std::string>> churned_partitions;
// TODO : Consider optimizing the query to not return stale
// tokens, i.e. prefix the token with start timestamp and use key range
// prefix.
while (cursor->Next()) {
// Only retrieve the active tokens that should be churned.
if (!cursor->ColumnValue(2).is_null()) {
continue;
}
const std::string& partition_token = cursor->ColumnValue(0).string_value();
const std::string& churn_type = cursor->ColumnValue(5).string_value();
const absl::Time start_time = cursor->ColumnValue(1).ToTime();
const absl::Time expected_end_time =
start_time + absl::GetFlag(FLAGS_change_stream_churning_interval);
if (expected_end_time < clock_->Now()) {
// Only churn the tokens whose start time is more than the specified
// churn interval in the past.
if (!churned_partitions.contains(churn_type)) {
churned_partitions.emplace(churn_type, std::vector<std::string>());
}
churned_partitions[churn_type].push_back(partition_token);
}
}
for (const auto& [churn_type, partition_tokens] : churned_partitions) {
// Churn the tokens retrieved above.
if (churn_type == "MOVE") {
// Make sure to move each partition.
for (const auto& partition_token : partition_tokens) {
ZETASQL_RETURN_IF_ERROR(
MovePartition(change_stream_name, partition_token, txn.get()));
}
} else if (churn_type == "SPLIT") {
for (const auto& partition_token : partition_tokens) {
ZETASQL_RETURN_IF_ERROR(
SplitPartition(change_stream_name, partition_token, txn.get()));
}
} else {
int number_of_tokens = partition_tokens.size();
// Check that the number of tokens to merge is exactly 2.
ZETASQL_RET_CHECK(churn_type == "MERGE");
ZETASQL_RET_CHECK(number_of_tokens == 2);
ZETASQL_RETURN_IF_ERROR(MergePartition(change_stream_name, partition_tokens[0],
partition_tokens[1], txn.get()));
}
}
return txn->Commit();
}
absl::Status ChangeStreamPartitionChurner::MovePartition(
absl::string_view change_stream_name, absl::string_view partition_token,
ReadWriteTransaction* txn) {
// Generate a new partition token string
const std::string new_partition_token = CreatePartitionTokenString();
Mutation m;
// Insert a new partition with the start time set to the transaction
// commit timestamp, and the parents set to the churned partition's token
// string.
// The partition that is being moved should always be moved.
m.AddWriteOp(
MutationOpType::kInsert,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "start_time", "parents", "next_churn"},
{{String(new_partition_token), String("spanner.commit_timestamp()"),
StringArray({std::string(partition_token)}), String("MOVE")}});
// Modify the existing partition with the end timestamp set to the transaction
// commit timestamp, and the children set to the new partition token.
m.AddWriteOp(MutationOpType::kUpdate,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "end_time", "children"},
{{String(partition_token), String("spanner.commit_timestamp()"),
StringArray({new_partition_token})}});
return txn->Write(m);
}
absl::Status ChangeStreamPartitionChurner::SplitPartition(
absl::string_view change_stream_name, absl::string_view partition_token,
ReadWriteTransaction* txn) {
// Generate a new partition token string
const std::string new_partition_token_one = CreatePartitionTokenString();
const std::string new_partition_token_two = CreatePartitionTokenString();
Mutation m;
// Insert a new partition with the start time set to the transaction
// commit timestamp, and the parents set to the churned partition's token
// string.
// If the partition is being split, the next time it should be merged.
m.AddWriteOp(
MutationOpType::kInsert,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "start_time", "parents", "next_churn"},
{{String(new_partition_token_one), String("spanner.commit_timestamp()"),
StringArray({std::string(partition_token)}), String("MERGE")}});
m.AddWriteOp(
MutationOpType::kInsert,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "start_time", "parents", "next_churn"},
{{String(new_partition_token_two), String("spanner.commit_timestamp()"),
StringArray({std::string(partition_token)}), String("MERGE")}});
// Modify the existing partition with the end timestamp set to the transaction
// commit timestamp, and the children set to the new partition tokens.
m.AddWriteOp(
MutationOpType::kUpdate,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "end_time", "children"},
{{String(partition_token), String("spanner.commit_timestamp()"),
StringArray({new_partition_token_one, new_partition_token_two})}});
return txn->Write(m);
}
absl::Status ChangeStreamPartitionChurner::MergePartition(
absl::string_view change_stream_name,
absl::string_view first_partition_token,
absl::string_view second_partition_token, ReadWriteTransaction* txn) {
// Generate a new partition token string
const std::string new_partition_token = CreatePartitionTokenString();
Mutation m;
// Insert a new partition with the start time set to the transaction
// commit timestamp, and the parents set to the two merged parents.
// If the partition is being meged, next time it should be split.
m.AddWriteOp(
MutationOpType::kInsert,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "start_time", "parents", "next_churn"},
{{String(new_partition_token), String("spanner.commit_timestamp()"),
StringArray({std::string(first_partition_token),
std::string(second_partition_token)}),
String("SPLIT")}});
// Modify the existing partitions with the end timestamp set to the
// transaction commit timestamp, and the children set to the new partition
// token.
m.AddWriteOp(
MutationOpType::kUpdate,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "end_time", "children"},
{{String(first_partition_token), String("spanner.commit_timestamp()"),
StringArray({new_partition_token})}});
m.AddWriteOp(
MutationOpType::kUpdate,
MakeChangeStreamPartitionTableName(change_stream_name),
{"partition_token", "end_time", "children"},
{{String(second_partition_token), String("spanner.commit_timestamp()"),
StringArray({new_partition_token})}});
return txn->Write(m);
}
void ChangeStreamPartitionChurner::Update(const Schema* schema) {
// Iterate through the change streams in the schema.
absl::MutexLock l(&mu_);
absl::flat_hash_set<std::string> change_stream_names;
for (const auto& [change_stream_name, churn_thread] : churn_threads_) {
change_stream_names.insert(change_stream_name);
}
for (const auto* change_stream : schema->change_streams()) {
// If the thread does not exist in the factory.
if (!change_stream_names.contains(change_stream->Name())) {
CreateChurningThread(change_stream->Name());
}
}
// Remove all nonexistent change stream threads.
for (auto& change_stream_name : change_stream_names) {
if (schema->FindChangeStream(change_stream_name) == nullptr) {
ClearChurningThread(change_stream_name);
}
}
}
int ChangeStreamPartitionChurner::GetNumThreads() {
absl::MutexLock l(&mu_);
return churn_threads_.size();
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google