backend/schema/catalog/change_stream.h (130 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_SCHEMA_CATALOG_CHANGE_STREAM_H_
#define THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_SCHEMA_CATALOG_CHANGE_STREAM_H_
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include "zetasql/public/type.h"
#include "absl/container/flat_hash_set.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/substitute.h"
#include "backend/common/ids.h"
#include "backend/schema/ddl/operations.pb.h"
#include "backend/schema/graph/schema_node.h"
#include "backend/schema/updater/schema_validation_context.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
inline constexpr absl::string_view kChangeStreamRetentionPeriodDefault = "1d";
inline constexpr absl::string_view kChangeStreamValueCaptureTypeDefault =
"OLD_AND_NEW_VALUES";
inline constexpr absl::string_view kChangeStreamValueCaptureTypeNewRow =
"NEW_ROW";
inline constexpr absl::string_view kChangeStreamValueCaptureTypeNewValues =
"NEW_VALUES";
inline constexpr absl::string_view
kChangeStreamValueCaptureTypeNewRowOldValues = "NEW_ROW_AND_OLD_VALUES";
class Table;
class Column;
class ChangeStream : public SchemaNode {
public:
// Returns the name of this change stream.
std::string Name() const { return name_; }
std::string tvf_name() const { return tvf_name_; }
const ChangeStreamID id() const { return id_; }
// Returns the tables and columns that is tracked.
absl::flat_hash_map<std::string, std::vector<std::string>>
tracked_tables_columns() const {
return tracked_tables_columns_;
}
// Returns the backing table which stores the change stream data.
const Table* change_stream_data_table() const {
return change_stream_data_table_;
}
const Table* change_stream_partition_table() const {
return change_stream_partition_table_;
}
std::optional<std::string> retention_period() const {
return retention_period_;
}
int64_t parsed_retention_period() const { return parsed_retention_period_; }
absl::Time creation_time() const { return creation_time_; }
std::optional<std::string> value_capture_type() const {
return value_capture_type_;
}
bool track_all() const { return track_all_; }
const ddl::ChangeStreamForClause* for_clause() const {
return for_clause_.has_value() ? &for_clause_.value() : nullptr;
}
std::optional<bool> exclude_insert() const { return exclude_insert_; }
std::optional<bool> exclude_update() const { return exclude_update_; }
std::optional<bool> exclude_delete() const { return exclude_delete_; }
std::optional<bool> exclude_ttl_deletes() const {
return exclude_ttl_deletes_;
}
bool HasExplicitValidOptions() const {
return value_capture_type().has_value() || retention_period().has_value() ||
exclude_insert().has_value() || exclude_update().has_value() ||
exclude_delete().has_value() || exclude_ttl_deletes().has_value();
}
const ::google::protobuf::RepeatedPtrField<ddl::SetOption> options() const {
return options_;
}
std::optional<uint32_t> tvf_postgresql_oid() const {
return tvf_postgresql_oid_;
}
void set_tvf_postgresql_oid(uint32_t tvf_postgresql_oid) {
tvf_postgresql_oid_ = tvf_postgresql_oid;
}
// SchemaNode interface implementation.
// ------------------------------------
std::optional<SchemaNameInfo> GetSchemaNameInfo() const override {
return SchemaNameInfo{
.name = name_, .kind = "ChangeStream", .global = true};
}
absl::Status Validate(SchemaValidationContext* context) const override;
absl::Status ValidateUpdate(const SchemaNode* old,
SchemaValidationContext* context) const override;
std::string DebugString() const override;
class Builder;
class Editor;
private:
friend class ChangeStreamValidator;
using ValidationFn = std::function<absl::Status(const ChangeStream*,
SchemaValidationContext*)>;
using UpdateValidationFn = std::function<absl::Status(
const ChangeStream*, const ChangeStream*, SchemaValidationContext*)>;
// Constructors are private and only friend classes are able to build.
ChangeStream(const ValidationFn& validate,
const UpdateValidationFn& validate_update)
: validate_(validate), validate_update_(validate_update) {}
ChangeStream(const ChangeStream&) = default;
std::unique_ptr<SchemaNode> ShallowClone() const override {
return absl::WrapUnique(new ChangeStream(*this));
}
absl::Status DeepClone(SchemaGraphEditor* editor,
const SchemaNode* orig) override;
// Validation delegates.
const ValidationFn validate_;
const UpdateValidationFn validate_update_;
// Name of this change stream.
std::string name_;
// Name of this change stream's table valued function.
std::string tvf_name_;
// The tables and columns that this change stream tracks.
absl::flat_hash_map<std::string, std::vector<std::string>>
tracked_tables_columns_;
// We should not set the default value for the set options; otherwise, we
// cannot know if they are specified in the DDL statement.
std::optional<std::string> retention_period_;
std::optional<std::string> value_capture_type_;
bool track_all_ = false;
// TODO: assign the ID during change stream creation
// A unique ID for identifying this change stream in the schema that owns this
// change stream.
ChangeStreamID id_;
// Parsed retention period in seconds, use for query timestamp validation,
// default is 1 day in seconds
int64_t parsed_retention_period_ = 24 * 60 * 60;
// Timestamp which this change stream is created, use for prevent querying
// change stream before creation
absl::Time creation_time_;
// A copy of for clause statement. We cannot use a pointer here because
// ddl::ChangeStreamForClause will be destroyed after parsing and we will get
// a segmentation fault when accessing its content. A unique pointer does not
// work since we need to support ShallowClone() and the ownership will be
// passed to the new object.
std::optional<ddl::ChangeStreamForClause> for_clause_;
::google::protobuf::RepeatedPtrField<ddl::SetOption> options_;
// The backing table that stores the change stream data.
const Table* change_stream_data_table_;
// The table that stores the partition data.
const Table* change_stream_partition_table_;
// The OID of the change stream's TVF. Only assigned a value for the
// POSTGRESQL dialect. std::nullopt indicates that no OID has been assigned.
std::optional<uint32_t> tvf_postgresql_oid_ = std::nullopt;
// If true, exclude recording corresponding mofication type from current
// change stream. Default is false if not set.
std::optional<bool> exclude_insert_ = std::nullopt;
std::optional<bool> exclude_update_ = std::nullopt;
std::optional<bool> exclude_delete_ = std::nullopt;
// If true, exclude recording ttl delete from current change stream. Default
// is false if not set.
std::optional<bool> exclude_ttl_deletes_ = std::nullopt;
};
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google
#endif // THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_SCHEMA_CATALOG_CHANGE_STREAM_H_