be/src/olap/row_cursor.cpp (262 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/row_cursor.h"
#include <glog/logging.h>
#include <stdlib.h>
#include <algorithm>
#include <new>
#include <numeric>
#include <ostream>
#include "olap/field.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/tablet_schema.h"
#include "util/slice.h"
using std::nothrow;
using std::string;
using std::vector;
namespace doris {
using namespace ErrorCode;
RowCursor::RowCursor()
: _fixed_len(0), _variable_len(0), _string_field_count(0), _long_text_buf(nullptr) {}
RowCursor::~RowCursor() {
delete[] _owned_fixed_buf;
delete[] _variable_buf;
if (_string_field_count > 0 && _long_text_buf != nullptr) {
for (int i = 0; i < _string_field_count; ++i) {
free(_long_text_buf[i]);
}
free(_long_text_buf);
}
}
Status RowCursor::_init(const std::vector<uint32_t>& columns) {
_variable_len = 0;
for (auto cid : columns) {
if (_schema->column(cid) == nullptr) {
return Status::Error<INIT_FAILED>("Fail to malloc _fixed_buf.");
}
_variable_len += column_schema(cid)->get_variable_len();
if (_schema->column(cid)->type() == FieldType::OLAP_FIELD_TYPE_STRING) {
++_string_field_count;
}
}
_fixed_len = _schema->schema_size();
_fixed_buf = new (nothrow) char[_fixed_len]();
if (_fixed_buf == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _fixed_buf.");
}
_owned_fixed_buf = _fixed_buf;
return Status::OK();
}
Status RowCursor::_init(const std::shared_ptr<Schema>& shared_schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(*shared_schema));
return _init(columns);
}
Status RowCursor::_init(const std::vector<TabletColumnPtr>& schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(schema, columns));
return _init(columns);
}
Status RowCursor::_init_scan_key(TabletSchemaSPtr schema,
const std::vector<std::string>& scan_keys) {
// NOTE: cid equal with column index
// Hyperloglog cannot be key, no need to handle it
_variable_len = 0;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema->column(cid);
FieldType type = column.type();
if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
_variable_len += scan_keys[cid].length();
} else if (type == FieldType::OLAP_FIELD_TYPE_CHAR ||
type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
_variable_len += std::max(scan_keys[cid].length(), column.length());
} else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
++_string_field_count;
}
}
// variable_len for null bytes
RETURN_IF_ERROR(_alloc_buf());
char* fixed_ptr = _fixed_buf;
char* variable_ptr = _variable_buf;
char** long_text_ptr = _long_text_buf;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema->column(cid);
fixed_ptr = _fixed_buf + _schema->column_offset(cid);
FieldType type = column.type();
if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = variable_ptr;
slice->size = scan_keys[cid].length();
variable_ptr += scan_keys[cid].length();
} else if (type == FieldType::OLAP_FIELD_TYPE_CHAR) {
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = variable_ptr;
slice->size = std::max(scan_keys[cid].length(), column.length());
variable_ptr += slice->size;
} else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
_schema->mutable_column(cid)->set_long_text_buf(long_text_ptr);
Slice* slice = reinterpret_cast<Slice*>(fixed_ptr + 1);
slice->data = *(long_text_ptr);
slice->size = DEFAULT_TEXT_LENGTH;
++long_text_ptr;
}
}
return Status::OK();
}
Status RowCursor::init(TabletSchemaSPtr schema) {
return init(schema->columns(), schema->num_columns());
}
Status RowCursor::init(const std::vector<TabletColumnPtr>& schema) {
return init(schema, schema.size());
}
Status RowCursor::init(TabletSchemaSPtr schema, size_t column_count) {
if (column_count > schema->num_columns()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
column_count, schema->num_columns());
}
std::vector<uint32_t> columns;
for (size_t i = 0; i < column_count; ++i) {
columns.push_back(i);
}
RETURN_IF_ERROR(_init(schema->columns(), columns));
return Status::OK();
}
Status RowCursor::init(const std::vector<TabletColumnPtr>& schema, size_t column_count) {
if (column_count > schema.size()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
column_count, schema.size());
}
std::vector<uint32_t> columns;
for (size_t i = 0; i < column_count; ++i) {
columns.push_back(i);
}
RETURN_IF_ERROR(_init(schema, columns));
return Status::OK();
}
Status RowCursor::init(TabletSchemaSPtr schema, const std::vector<uint32_t>& columns) {
RETURN_IF_ERROR(_init(schema->columns(), columns));
return Status::OK();
}
Status RowCursor::init_scan_key(TabletSchemaSPtr schema,
const std::vector<std::string>& scan_keys) {
size_t scan_key_size = scan_keys.size();
if (scan_key_size > schema->num_columns()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
scan_key_size, schema->num_columns());
}
std::vector<uint32_t> columns(scan_key_size);
std::iota(columns.begin(), columns.end(), 0);
RETURN_IF_ERROR(_init(schema->columns(), columns));
return _init_scan_key(schema, scan_keys);
}
Status RowCursor::init_scan_key(TabletSchemaSPtr schema, const std::vector<std::string>& scan_keys,
const std::shared_ptr<Schema>& shared_schema) {
size_t scan_key_size = scan_keys.size();
std::vector<uint32_t> columns;
for (size_t i = 0; i < scan_key_size; ++i) {
columns.push_back(i);
}
RETURN_IF_ERROR(_init(shared_schema, columns));
return _init_scan_key(schema, scan_keys);
}
Status RowCursor::build_max_key() {
for (auto cid : _schema->column_ids()) {
const Field* field = column_schema(cid);
char* dest = cell_ptr(cid);
field->set_to_max(dest);
set_not_null(cid);
}
return Status::OK();
}
Status RowCursor::build_min_key() {
for (auto cid : _schema->column_ids()) {
const Field* field = column_schema(cid);
char* dest = cell_ptr(cid);
field->set_to_min(dest);
set_null(cid);
}
return Status::OK();
}
Status RowCursor::from_tuple(const OlapTuple& tuple) {
if (tuple.size() != _schema->num_column_ids()) {
return Status::Error<INVALID_ARGUMENT>(
"column count does not match. tuple_size={}, field_count={}", tuple.size(),
_schema->num_column_ids());
}
for (size_t i = 0; i < tuple.size(); ++i) {
auto cid = _schema->column_ids()[i];
const Field* field = column_schema(cid);
if (tuple.is_null(i)) {
set_null(cid);
continue;
}
set_not_null(cid);
char* buf = cell_ptr(cid);
Status res = field->from_string(buf, tuple.get_value(i), field->get_precision(),
field->get_scale());
if (!res.ok()) {
LOG(WARNING) << "fail to convert field from string. string=" << tuple.get_value(i)
<< ", res=" << res;
return res;
}
}
return Status::OK();
}
OlapTuple RowCursor::to_tuple() const {
OlapTuple tuple;
for (auto cid : _schema->column_ids()) {
if (_schema->column(cid) != nullptr) {
const Field* field = column_schema(cid);
char* src = cell_ptr(cid);
if (is_null(cid)) {
tuple.add_null();
} else {
tuple.add_value(field->to_string(src));
}
} else {
tuple.add_value("");
}
}
return tuple;
}
std::string RowCursor::to_string() const {
std::string result;
size_t i = 0;
for (auto cid : _schema->column_ids()) {
if (i++ > 0) {
result.append("|");
}
const Field* field = column_schema(cid);
result.append(std::to_string(is_null(cid)));
result.append("&");
if (is_null(cid)) {
result.append("NULL");
} else {
char* src = cell_ptr(cid);
result.append(field->to_string(src));
}
}
return result;
}
Status RowCursor::_alloc_buf() {
// variable_len for null bytes
_variable_buf = new (nothrow) char[_variable_len]();
if (_variable_buf == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _variable_buf.");
}
if (_string_field_count > 0) {
_long_text_buf = (char**)malloc(_string_field_count * sizeof(char*));
if (_long_text_buf == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
}
for (int i = 0; i < _string_field_count; ++i) {
_long_text_buf[i] = (char*)malloc(DEFAULT_TEXT_LENGTH * sizeof(char));
if (_long_text_buf[i] == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
}
}
}
return Status::OK();
}
} // namespace doris