backend/actions/column_value.cc (176 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/actions/column_value.h"
#include <cstdint>
#include <vector>
#include "zetasql/public/functions/string.h"
#include "zetasql/public/value.h"
#include "absl/status/status.h"
#include "absl/types/span.h"
#include "backend/actions/context.h"
#include "backend/actions/ops.h"
#include "backend/datamodel/key.h"
#include "backend/schema/catalog/column.h"
#include "backend/schema/catalog/table.h"
#include "common/clock.h"
#include "common/constants.h"
#include "common/errors.h"
#include "common/limits.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
absl::Status ColumnValueValidator::ValidateColumnValueType(
const Table* table, const Column* const column,
const zetasql::Value& value) const {
// Check that type is same for both column and the corresponding value.
if (column->GetType()->kind() != value.type()->kind()) {
return error::ColumnValueTypeMismatch(table->Name(),
column->GetType()->DebugString(),
value.type()->DebugString());
}
// Check that we are not attempting to write null values to non-nullable
// columns. Writing null to non-nullable generated or default columns is
// temporarily fine, since the violation may be fixed later by an operation
// to update the column.
if (value.is_null() && !column->is_nullable() && !column->is_generated()) {
return error::NullValueForNotNullColumn(table->Name(), column->FullName());
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateKeyNotNull(const Table* table,
const Key& key) const {
// Incoming key should already have the correct number of columns
// corresponding to the primary key of the given table. So we do not check it
// here.
absl::Span<const KeyColumn* const> primary_key = table->primary_key();
for (int i = 0; i < primary_key.size(); ++i) {
if (!primary_key.at(i)->column()->is_nullable() &&
key.ColumnValue(i).is_null()) {
return error::CannotParseKeyValue(
table->Name(), primary_key.at(i)->column()->Name(),
primary_key.at(i)->column()->GetType()->DebugString());
}
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateColumnStringValue(
const Table* table, const Column* column,
const zetasql::Value& value) const {
// Validate that strings do not exceed max length.
if (!value.is_null()) {
absl::Status error;
int64_t encoded_chars = 0;
if (!zetasql::functions::LengthUtf8(value.string_value(), &encoded_chars,
&error)) {
return error::InvalidStringEncoding(table->Name(), column->Name());
}
if (encoded_chars > column->effective_max_length()) {
return error::ValueExceedsLimit(column->FullName(), encoded_chars,
column->effective_max_length());
}
if (column->is_placement_key() && !value.string_value().empty() &&
!placements_.contains(value.string_value())) {
return error::UnknownPlacement(value.string_value());
}
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateColumnBytesValue(
const Table* table, const Column* column,
const zetasql::Value& value) const {
// Validate that bytes do not exceed max length.
if (!value.is_null()) {
if (value.bytes_value().size() > column->effective_max_length()) {
return error::ValueExceedsLimit(column->FullName(),
value.bytes_value().size(),
column->effective_max_length());
}
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateColumnArrayValue(
const Table* table, const Column* column,
const zetasql::Value& value) const {
// Validate the vector search array length.
if (column->has_vector_length() && !value.is_null()) {
int array_length = value.elements().size();
int expected_length = column->vector_length().value();
if (array_length > expected_length) {
return error::VectorLengthExceedsLimit(column->FullName(), array_length,
expected_length);
} else if (array_length < expected_length) {
return error::VectorLengthLessThanLimit(column->FullName(), array_length,
expected_length);
}
}
// Validate that bytes and string array element types do not exceed max
// length.
if (!value.is_null()) {
if (value.type()->AsArray()->element_type()->IsString()) {
for (const auto& element : value.elements()) {
ZETASQL_RETURN_IF_ERROR(ValidateColumnStringValue(table, column, element));
}
} else if (value.type()->AsArray()->element_type()->IsBytes()) {
for (const auto& element : value.elements()) {
ZETASQL_RETURN_IF_ERROR(ValidateColumnBytesValue(table, column, element));
}
}
// Validate no element in vector search array is null.
// Currently vector length can only be applied on ARRAY containing FLOAT32
// or FLOAT64 elements.
if (column->has_vector_length()) {
for (const auto& element : value.elements()) {
if (element.is_null()) {
return error::DisallowNullsInSearchArray(column->FullName());
}
}
}
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateColumnTimestampValue(
const Column* const column, const zetasql::Value& value,
Clock* clock) const {
// Check that user provided timestamp value is not in future. Sentinel max
// timestamp value for commit timestamp column can only be set internally.
if (column->allows_commit_timestamp() && !value.is_null() &&
value.ToTime() != kCommitTimestampValueSentinel &&
value.ToTime() > clock->Now()) {
return error::CommitTimestampInFuture(value.ToTime());
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateKeySize(const Table* table,
const Key& key) const {
int64_t key_size = key.LogicalSizeInBytes();
if (key_size > limits::kMaxKeySizeBytes) {
return error::KeyTooLarge(table->Name(), key_size,
limits::kMaxKeySizeBytes);
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::ValidateInsertUpdateOp(
const Table* table, const std::vector<const Column*>& columns,
const std::vector<zetasql::Value>& values, Clock* clock) const {
for (int i = 0; i < columns.size(); i++) {
ZETASQL_RETURN_IF_ERROR(ValidateColumnValueType(table, columns[i], values[i]));
switch (columns[i]->GetType()->kind()) {
case zetasql::TYPE_ARRAY:
ZETASQL_RETURN_IF_ERROR(ValidateColumnArrayValue(table, columns[i], values[i]));
break;
case zetasql::TYPE_BYTES:
ZETASQL_RETURN_IF_ERROR(ValidateColumnBytesValue(table, columns[i], values[i]));
break;
case zetasql::TYPE_STRING:
ZETASQL_RETURN_IF_ERROR(
ValidateColumnStringValue(table, columns[i], values[i]));
break;
case zetasql::TYPE_TIMESTAMP:
ZETASQL_RETURN_IF_ERROR(
ValidateColumnTimestampValue(columns[i], values[i], clock));
break;
default:
continue;
}
}
return absl::OkStatus();
}
absl::Status ColumnValueValidator::Validate(const ActionContext* ctx,
const InsertOp& op) const {
ZETASQL_RETURN_IF_ERROR(ValidateKeySize(op.table, op.key));
return ValidateInsertUpdateOp(op.table, op.columns, op.values, ctx->clock());
}
absl::Status ColumnValueValidator::Validate(const ActionContext* ctx,
const UpdateOp& op) const {
return ValidateInsertUpdateOp(op.table, op.columns, op.values, ctx->clock());
}
absl::Status ColumnValueValidator::Validate(const ActionContext* ctx,
const DeleteOp& op) const {
return ValidateKeyNotNull(op.table, op.key);
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google