frontend/converters/chunking.cc (186 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "frontend/converters/chunking.h"
#include <algorithm>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include "absl/strings/substitute.h"
#include "common/errors.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace {
// UTF-8 is at most 4 bytes. The follow chart explains the format of each
// UTF-8 character.
// Char. number range | UTF-8 octet sequence
// (hexadecimal) | (binary)
// --------------------+---------------------------------------------
// 0000 0000-0000 007F | 0xxxxxxx
// 0000 0080-0000 07FF | 110xxxxx 10xxxxxx
// 0000 0800-0000 FFFF | 1110xxxx 10xxxxxx 10xxxxxx
// 0001 0000-0010 FFFF | 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
//
// More detail in the spec: https://tools.ietf.org/html/rfc3629#page-4
const uint8_t kPartialUTF8Bytes = 1 << 7; // 0b10000000
const uint8_t kUTF8TwoBytes = 3 << 6; // 0b11000000
const uint8_t kUTF8ThreeBytes = 7 << 5; // 0b11100000
const uint8_t kUTF8FourBytes = 15 << 4; // 0b11110000
const int64_t kMaxUTF8CharSize = 4;
bool IsPartialUTF8(char input) {
return (static_cast<int>(input) & kPartialUTF8Bytes) != 0;
}
int64_t RemovePartialUTF8(absl::string_view string, int64_t available) {
for (int64_t pos = available - 1;
pos >= std::max<int64_t>(available - kMaxUTF8CharSize, 0); pos--) {
char partial = string[pos];
// Only remove partial UTF-8 character.
if ((partial & kUTF8FourBytes) == kUTF8FourBytes) {
if (pos != available - 4) available = pos;
break;
} else if ((partial & kUTF8ThreeBytes) == kUTF8ThreeBytes) {
if (pos != available - 3) available = pos;
break;
} else if ((partial & kUTF8TwoBytes) == kUTF8TwoBytes) {
if (pos != available - 2) available = pos;
break;
}
}
return available;
}
// Constructs a set of PartialResultSets. Data will be chunked as necessary to
// comply with the Cloud Spanner streaming chunk size limit. Only Strings and
// Lists need to be chunked (Structs are not a valid column type and will return
// an error if encountered).
class ResultSetBuilder {
public:
explicit ResultSetBuilder(
int64_t max_chunk_size,
std::vector<google::spanner::v1::PartialResultSet>* results)
: max_chunk_size_(max_chunk_size), results_(results) {
if (results_->empty()) {
results_->emplace_back();
}
current_chunk_size_ = results_->back().ByteSizeLong();
stack_.push_back(results_->back().mutable_values());
}
// Adds the incoming value to the set of PartialResultSets chunking as
// necessary.
absl::Status AddValue(const protobuf::Value& value) {
// If the current size exceeds the limit, create a new chunk.
if (HasExceededChunkLimit()) {
StartNewResultSet();
}
// Adds the value to the current result set. It will be chunked into pieces
// if the size of a result set would exceed max_chunk_size_. In that case,
// partial values will be added to the end of this result set and beginning
// of the next one. The partial results will be merged back together by the
// receiving client.
auto value_size = value.ByteSizeLong();
switch (value.kind_case()) {
case protobuf::Value::kListValue: {
// Check if list can fit into current chunk.
if (current_chunk_size_ + value_size <= max_chunk_size_) {
AddUnchunkedValue(value);
} else {
StartList();
for (const auto& list_value : value.list_value().values()) {
ZETASQL_RETURN_IF_ERROR(AddValue(list_value));
}
FinishList();
}
CheckListBoundary();
break;
}
case protobuf::Value::kStringValue: {
// Check if string can fit into current chunk.
if (current_chunk_size_ + value_size <= max_chunk_size_) {
AddUnchunkedValue(value);
} else {
AddString(value.string_value());
}
CheckStringBoundary();
break;
}
case protobuf::Value::kBoolValue:
case protobuf::Value::kNumberValue:
case protobuf::Value::kNullValue:
AddUnchunkedValue(value);
break;
default:
return error::Internal(absl::Substitute(
"Cannot convert value of type ($0) to a potentially "
"chunked PartialResultSet.",
value.GetTypeName()));
}
return absl::OkStatus();
}
private:
ResultSetBuilder(const ResultSetBuilder&) = delete;
ResultSetBuilder& operator=(const ResultSetBuilder&) = delete;
bool HasExceededChunkLimit() {
return current_chunk_size_ >= max_chunk_size_;
}
bool IsListOpen() { return stack_.size() > 1; }
// Adds a value as the next value without chunking. The value will be added to
// a list if there are any nested lists otherwise it will be added as the next
// value in results. Used for the fast path when it is known this will not
// need to be chunked.
void AddUnchunkedValue(const protobuf::Value& value) {
*stack_.back()->Add() = value;
current_chunk_size_ += value.ByteSizeLong();
}
// If a nested list ends at the boundary of the chunk, we need to make sure
// that an empty list is added at the beginning of the next chunk so they will
// be merged together. Otherwise it could end up being incorrectly merged with
// a disjoint list in the next chunk. We explicitly check for this to catch
// edge cases.
void CheckListBoundary() {
if (HasExceededChunkLimit() && IsListOpen()) {
StartNewResultSet();
// Add and empty list to merge with the last list from the previous
// chunk.
StartList();
FinishList();
}
}
// If a string nested inside a list ends at the boundary of the chunk, we
// need to make sure that an empty string is added at the beginning of the
// next chunk so they will be merged together. Otherwise it could end up being
// incorrectly merged with another string in the next chunk. We explicitly
// check for this to catch edge cases.
void CheckStringBoundary() {
if (HasExceededChunkLimit() && IsListOpen()) {
StartNewResultSet();
// The last string ended within the previous chunk, so we don't want to
// concatenate it with the next string. Add an empty string to prevent
// this.
AddUnchunkedString("");
}
}
// Adds a string as the next value. The value will be added to a list if there
// are any nested lists otherwise it will be added as the next value in
// results.
void AddString(absl::string_view str) {
if (str.empty()) {
// Handle empty string case.
AddUnchunkedString("");
return;
}
while (!str.empty()) {
int64_t available = std::max(max_chunk_size_ - current_chunk_size_,
static_cast<int64_t>(0));
if (str.size() > available) {
// Strings are UTF-8 encoded. Not all client libraries support a split
// UTF-8 character. Flush the entire and not partial UTF-8 character.
if (available > 0 && IsPartialUTF8(str[available - 1])) {
available = RemovePartialUTF8(str, available);
}
// Chunk the string into pieces.
AddUnchunkedString(str.substr(0, available));
results_->back().set_chunked_value(true);
StartNewResultSet();
str.remove_prefix(available);
} else {
// String can fit into remaing space of current chunk.
AddUnchunkedString(str);
break;
}
}
}
// Adds an unchunked string to the current result set or list.
void AddUnchunkedString(absl::string_view str) {
auto value = stack_.back()->Add();
value->mutable_string_value()->assign(str.data(), str.size());
current_chunk_size_ += value->ByteSizeLong();
}
// Adds a list as the next value. The list will be nested in another list if
// there are any lists currently in the stack otherwise it will be added as
// the next value in results.
void StartList() {
auto value = stack_.back()->Add();
stack_.push_back(value->mutable_list_value()->mutable_values());
current_chunk_size_ += value->ByteSizeLong();
}
// Removes a list from the stack.
void FinishList() { stack_.pop_back(); }
// Adds a new partial result set to results. If list(s) are currently being
// processed it will create corresponding list(s) in the new chunk. The
// current result set will have chunked_value set to true if a list was
// currently being processed or if a string is split up.
void StartNewResultSet() {
if (IsListOpen()) {
// Always mark as chunked if inside a list.
results_->back().set_chunked_value(true);
}
size_t stack_depth = stack_.size() - 1;
stack_.clear();
results_->emplace_back();
stack_.push_back(results_->back().mutable_values());
for (int i = 0; i < stack_depth; ++i) {
auto list = stack_.back()->Add()->mutable_list_value();
stack_.push_back(list->mutable_values());
}
// Reset the size of the current result set.
current_chunk_size_ = results_->back().ByteSizeLong();
}
// The size of the current chunk that is being appended to. This is an
// estimate of the current chunk size. This estimate should work fine in
// practice since the max chunk size is 1MB and the default message size limit
// is 4MB for gRPC. Since we do not explicitly track the metadata, our size
// estimate could be off by as much as a factor of 2. However, this shouldn't
// be a problem since it will be well below the gRPC limit.
int64_t current_chunk_size_;
// The maximum allowed size of a chunk.
int64_t max_chunk_size_;
// The list of PartialResultSets that store the resulting chunks.
std::vector<::google::spanner::v1::PartialResultSet>* results_;
// The list stack is used to track nested lists. When a result set is chunked
// all current lists need to be truncated and matching versions created in the
// next chunk.
std::vector<google::protobuf::RepeatedPtrField<protobuf::Value>*> stack_;
};
} // namespace
absl::StatusOr<std::vector<google::spanner::v1::PartialResultSet>>
ChunkResultSet(const google::spanner::v1::ResultSet& set,
int64_t max_chunk_size) {
std::vector<google::spanner::v1::PartialResultSet> results;
results.emplace_back();
*results.front().mutable_metadata() = set.metadata();
ResultSetBuilder builder(max_chunk_size, &results);
for (const auto& row : set.rows()) {
for (const auto& value : row.values()) {
ZETASQL_RETURN_IF_ERROR(builder.AddValue(value));
}
}
return results;
}
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google