backend/schema/verifiers/column_value_verifiers.cc (239 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/schema/verifiers/column_value_verifiers.h"
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include "zetasql/public/functions/string.h"
#include "zetasql/public/type.pb.h"
#include "zetasql/public/value.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "backend/common/ids.h"
#include "backend/common/indexing.h"
#include "backend/common/rows.h"
#include "backend/datamodel/key.h"
#include "backend/datamodel/types.h"
#include "backend/datamodel/value.h"
#include "backend/schema/catalog/column.h"
#include "backend/schema/catalog/table.h"
#include "backend/schema/updater/schema_validation_context.h"
#include "backend/storage/in_memory_storage.h"
#include "backend/storage/iterator.h"
#include "common/errors.h"
#include "common/limits.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
namespace {
absl::Status VerifyColumnValue(
const SchemaValidationContext* context, const Table* table,
const Column* column,
const std::function<absl::Status(const zetasql::Value& column_value,
const Key& key)>& verifier) {
std::unique_ptr<StorageIterator> itr;
ZETASQL_RETURN_IF_ERROR(context->storage()->Read(context->pending_commit_timestamp(),
table->id(), KeyRange::All(),
{column->id()}, &itr));
while (itr->Next()) {
for (int i = 0; i < itr->NumColumns(); ++i) {
ZETASQL_RETURN_IF_ERROR(verifier(itr->ColumnValue(i), itr->Key()));
}
}
return absl::OkStatus();
}
absl::Status VerifyStringColumnValue(absl::string_view table_name,
absl::string_view column_name,
const zetasql::Value& value,
const Key& key,
const zetasql::Type* new_column_type,
int64_t new_max_length) {
ZETASQL_RET_CHECK(value.type()->IsString());
absl::Status error;
int64_t value_length;
if (!zetasql::functions::LengthUtf8(value.string_value(), &value_length,
&error)) {
return error::InvalidStringEncoding(table_name, column_name);
}
if (new_column_type->IsBytes()) {
value_length = value.string_value().length();
}
if (value_length > new_max_length) {
return error::InvalidColumnSizeReduction(column_name, new_max_length,
value_length, key.DebugString());
}
return absl::OkStatus();
}
absl::Status VerifyProtoColumnValue(absl::string_view table_name,
absl::string_view column_name,
const zetasql::Value& value,
const Key& key,
const zetasql::Type* new_column_type,
int64_t new_max_length) {
ZETASQL_RET_CHECK(value.type()->IsProto());
absl::Status error;
int64_t value_length = 0;
if (new_column_type->IsBytes()) {
// Proto value length when converting into bytes should be less than the max
// bytes length of the bytes column. Hence, to verify the length, we're
// converting it into string first (similar to what happens during a
// backfill)
value_length = std::string(value.proto_value()).length();
}
if (value_length > new_max_length) {
return error::InvalidColumnSizeReduction(column_name, new_max_length,
value_length, key.DebugString());
}
return absl::OkStatus();
}
absl::Status VerifyInt64ColumnValue(absl::string_view table_name,
absl::string_view column_name,
const zetasql::Value& value,
const Key& key,
const zetasql::Type* new_column_type) {
ZETASQL_RET_CHECK(value.type()->IsInt64());
absl::Status error;
if (new_column_type->IsEnum()) {
// When converting int64_t column to enum column, int64_t should lie within the
// enum range or the conversion should fail
int32_t int_value = value.int64_value();
const std::string* name;
// enum name corresponding to the integer value is not found
if (int_value != value.int64_value() ||
(!new_column_type->AsEnum()->EnumAllowsUnnamedValues() &&
!new_column_type->AsEnum()->FindName(int_value, &name))) {
return error::InvalidEnumValue(
column_name, int_value,
new_column_type->TypeName(zetasql::PRODUCT_INTERNAL),
key.DebugString());
}
}
return absl::OkStatus();
}
absl::Status VerifyBytesColumnValue(absl::string_view table_name,
absl::string_view column_name,
const zetasql::Value& value,
const Key& key,
const zetasql::Type* new_column_type,
int64_t new_max_length) {
ZETASQL_RET_CHECK(value.type()->IsBytes());
if (new_column_type->IsBytes()) {
if (value.bytes_value().size() > new_max_length) {
return error::InvalidColumnSizeReduction(column_name, new_max_length,
value.bytes_value().size(),
key.DebugString());
}
}
ZETASQL_RET_CHECK(new_column_type->IsString()
|| new_column_type->IsProto()
);
// Check that it is valid UTF-8 encoding.
absl::Status error;
int64_t encoded_chars;
if (!zetasql::functions::LengthUtf8(value.bytes_value(), &encoded_chars,
&error)) {
return error::UTF8StringColumn(column_name, key.DebugString());
}
// Validate length of new column.
if (encoded_chars > new_max_length) {
return error::InvalidColumnSizeReduction(column_name, new_max_length,
encoded_chars, key.DebugString());
}
return absl::OkStatus();
}
absl::Status VerifyColumnValueOnTypeChange(
absl::string_view table_name, absl::string_view column_name,
const zetasql::Value& value, const Key& key,
const zetasql::Type* old_column_type,
const zetasql::Type* new_column_type, int64_t new_max_length) {
ZETASQL_RET_CHECK(old_column_type != nullptr && new_column_type != nullptr);
// Check for null-ness before accessing value.
if (!value.is_valid() || value.is_null()) {
return absl::OkStatus();
}
if (old_column_type->IsArray()) {
ZETASQL_RET_CHECK(new_column_type->IsArray());
const auto* old_elem_type = BaseType(old_column_type);
const auto* new_elem_type = BaseType(new_column_type);
for (const auto& element : value.elements()) {
ZETASQL_RETURN_IF_ERROR(VerifyColumnValueOnTypeChange(
table_name, column_name, element, key, old_elem_type, new_elem_type,
new_max_length));
}
return absl::OkStatus();
}
if (old_column_type->IsString()) {
// We allow changing STRING to BYTES, but the BYTES column must be large
// enough to handle the conversion since each UTF8 character could
// potentially be up to 4 bytes.
ZETASQL_RETURN_IF_ERROR(VerifyStringColumnValue(table_name, column_name, value, key,
new_column_type, new_max_length));
}
if (old_column_type->IsProto()) {
ZETASQL_RETURN_IF_ERROR(VerifyProtoColumnValue(table_name, column_name, value, key,
new_column_type, new_max_length));
}
if (old_column_type->IsInt64()) {
ZETASQL_RETURN_IF_ERROR(VerifyInt64ColumnValue(table_name, column_name, value, key,
new_column_type));
}
if (old_column_type->IsBytes()) {
// Bytes must be valid UTF8 to convert to a string.
ZETASQL_RETURN_IF_ERROR(VerifyBytesColumnValue(table_name, column_name, value, key,
new_column_type, new_max_length));
}
return absl::OkStatus();
}
absl::Status VerifyColumnValuesOnTypeChange(
const Table* table, const Column* column,
const zetasql::Type* old_column_type,
const zetasql::Type* new_column_type, int64_t new_max_length,
const SchemaValidationContext* context) {
return VerifyColumnValue(
context, table, column,
[&](const zetasql::Value& value, const Key& key) -> absl::Status {
return VerifyColumnValueOnTypeChange(table->Name(), column->Name(),
value, key, old_column_type,
new_column_type, new_max_length);
});
}
} // namespace
absl::Status VerifyColumnNotNull(const Table* table, const Column* column,
const SchemaValidationContext* context) {
return VerifyColumnValue(
context, table, column,
[&](const zetasql::Value& value, const Key& key) -> absl::Status {
if (!value.is_valid() || value.is_null()) {
return error::NullValueForNotNullColumn(table->Name(), column->Name(),
key.DebugString());
}
return absl::OkStatus();
});
}
absl::Status VerifyColumnLength(const Table* table, const Column* column,
int64_t new_max_length,
const SchemaValidationContext* context) {
const auto* column_type = column->GetType();
return VerifyColumnValuesOnTypeChange(table, column, column_type, column_type,
new_max_length, context);
}
absl::Status VerifyColumnTypeChange(const Table* table,
const Column* old_column,
const Column* new_column,
const SchemaValidationContext* context) {
return VerifyColumnValuesOnTypeChange(
table, old_column, old_column->GetType(), new_column->GetType(),
new_column->effective_max_length(), context);
}
absl::Status VerifyColumnCommitTimestamp(
const Table* table, const Column* column,
const SchemaValidationContext* context) {
return VerifyColumnValue(
context, table, column,
[&](const zetasql::Value& value, const Key& key) -> absl::Status {
if (!value.is_valid() || value.is_null()) {
return absl::OkStatus();
}
// Check that timestamp is not greater than commit time.
if (value.type()->IsTimestamp() &&
value.ToTime() >= context->pending_commit_timestamp()) {
return error::CommitTimestampNotInFuture(
column->Name(), key.DebugString(), value.ToTime());
}
return absl::OkStatus();
});
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google