in be/src/vec/exec/format/parquet/parquet_pred_cmp.h [122:385]
static bool _filter_by_min_max(const ColumnValueRange<primitive_type>& col_val_range,
const ScanPredicate& predicate, const FieldSchema* col_schema,
const std::string& encoded_min, const std::string& encoded_max,
const cctz::time_zone& ctz, bool use_min_max_value = false) {
using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
std::vector<CppType> predicate_values;
for (const void* v : predicate.values) {
predicate_values.emplace_back(*reinterpret_cast<const CppType*>(v));
}
CppType min_value;
CppType max_value;
std::unique_ptr<std::string> encoded_min_copy;
std::unique_ptr<std::string> encoded_max_copy;
tparquet::Type::type physical_type = col_schema->physical_type;
switch (col_val_range.type()) {
#define DISPATCH(REINTERPRET_TYPE, PARQUET_TYPE) \
case REINTERPRET_TYPE: \
if (col_schema->physical_type != PARQUET_TYPE) return false; \
min_value = *reinterpret_cast<const CppType*>(encoded_min.data()); \
max_value = *reinterpret_cast<const CppType*>(encoded_max.data()); \
break;
FOR_REINTERPRET_TYPES(DISPATCH)
#undef DISPATCH
case TYPE_FLOAT:
if constexpr (std::is_same_v<CppType, float>) {
if (col_schema->physical_type != tparquet::Type::FLOAT) {
return false;
}
min_value = *reinterpret_cast<const CppType*>(encoded_min.data());
max_value = *reinterpret_cast<const CppType*>(encoded_max.data());
if (std::isnan(min_value) || std::isnan(max_value)) {
return false;
}
// Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
if (std::signbit(min_value) == 0 && min_value == 0.0F) {
min_value = -0.0F;
}
if (std::signbit(max_value) != 0 && max_value == -0.0F) {
max_value = 0.0F;
}
break;
} else {
return false;
}
case TYPE_DOUBLE:
if constexpr (std::is_same_v<CppType, float>) {
if (col_schema->physical_type != tparquet::Type::DOUBLE) {
return false;
}
min_value = *reinterpret_cast<const CppType*>(encoded_min.data());
max_value = *reinterpret_cast<const CppType*>(encoded_max.data());
if (std::isnan(min_value) || std::isnan(max_value)) {
return false;
}
// Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
if (std::signbit(min_value) == 0 && min_value == 0.0) {
min_value = -0.0;
}
if (std::signbit(max_value) != 0 && max_value == -0.0) {
max_value = 0.0;
}
break;
} else {
return false;
}
case TYPE_VARCHAR:
[[fallthrough]];
case TYPE_CHAR:
[[fallthrough]];
case TYPE_STRING:
if constexpr (std::is_same_v<CppType, StringRef>) {
if (!use_min_max_value) {
encoded_min_copy = std::make_unique<std::string>(encoded_min);
encoded_max_copy = std::make_unique<std::string>(encoded_max);
if (!_try_read_old_utf8_stats(*encoded_min_copy, *encoded_max_copy)) {
return false;
}
min_value = StringRef(*encoded_min_copy);
max_value = StringRef(*encoded_max_copy);
} else {
min_value = StringRef(encoded_min);
max_value = StringRef(encoded_max);
}
} else {
return false;
}
break;
case TYPE_DECIMALV2:
if constexpr (std::is_same_v<CppType, DecimalV2Value>) {
size_t max_precision = max_decimal_precision<Decimal128V2>();
if (col_schema->parquet_schema.precision < 1 ||
col_schema->parquet_schema.precision > max_precision ||
col_schema->parquet_schema.scale > max_precision) {
return false;
}
int v2_scale = DecimalV2Value::SCALE;
if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
min_value = DecimalV2Value(_decode_binary_decimal<Decimal128V2>(
col_schema, encoded_min, v2_scale));
max_value = DecimalV2Value(_decode_binary_decimal<Decimal128V2>(
col_schema, encoded_max, v2_scale));
} else if (physical_type == tparquet::Type::INT32) {
min_value = DecimalV2Value(_decode_primitive_decimal<Decimal128V2, Int32>(
col_schema, encoded_min, v2_scale));
max_value = DecimalV2Value(_decode_primitive_decimal<Decimal128V2, Int32>(
col_schema, encoded_max, v2_scale));
} else if (physical_type == tparquet::Type::INT64) {
min_value = DecimalV2Value(_decode_primitive_decimal<Decimal128V2, Int64>(
col_schema, encoded_min, v2_scale));
max_value = DecimalV2Value(_decode_primitive_decimal<Decimal128V2, Int64>(
col_schema, encoded_max, v2_scale));
} else {
return false;
}
} else {
return false;
}
break;
case TYPE_DECIMAL32:
[[fallthrough]];
case TYPE_DECIMAL64:
[[fallthrough]];
case TYPE_DECIMAL128I:
if constexpr (std::is_same_v<CppType, Decimal32> ||
std::is_same_v<CppType, Decimal64> ||
std::is_same_v<CppType, Decimal128V3>) {
size_t max_precision = max_decimal_precision<CppType>();
if (col_schema->parquet_schema.precision < 1 ||
col_schema->parquet_schema.precision > max_precision ||
col_schema->parquet_schema.scale > max_precision) {
return false;
}
if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
min_value = _decode_binary_decimal<CppType>(col_schema, encoded_min,
predicate.scale);
max_value = _decode_binary_decimal<CppType>(col_schema, encoded_max,
predicate.scale);
} else if (physical_type == tparquet::Type::INT32) {
min_value = _decode_primitive_decimal<CppType, Int32>(col_schema, encoded_min,
predicate.scale);
max_value = _decode_primitive_decimal<CppType, Int32>(col_schema, encoded_max,
predicate.scale);
} else if (physical_type == tparquet::Type::INT64) {
min_value = _decode_primitive_decimal<CppType, Int64>(col_schema, encoded_min,
predicate.scale);
max_value = _decode_primitive_decimal<CppType, Int64>(col_schema, encoded_max,
predicate.scale);
} else {
return false;
}
} else {
return false;
}
break;
case TYPE_DATE:
[[fallthrough]];
case TYPE_DATEV2:
if (physical_type == tparquet::Type::INT32) {
int64_t min_date_value =
static_cast<int64_t>(*reinterpret_cast<const int32_t*>(encoded_min.data()));
int64_t max_date_value =
static_cast<int64_t>(*reinterpret_cast<const int32_t*>(encoded_max.data()));
if constexpr (std::is_same_v<CppType, VecDateTimeValue> ||
std::is_same_v<CppType, DateV2Value<DateV2ValueType>>) {
min_value.from_unixtime(min_date_value * 24 * 60 * 60, ctz);
max_value.from_unixtime(max_date_value * 24 * 60 * 60, ctz);
} else {
return false;
}
} else {
return false;
}
break;
case TYPE_DATETIME:
[[fallthrough]];
case TYPE_DATETIMEV2:
if (physical_type == tparquet::Type::INT96) {
ParquetInt96 datetime96_min =
*reinterpret_cast<const ParquetInt96*>(encoded_min.data());
int64_t micros_min = datetime96_min.to_timestamp_micros();
ParquetInt96 datetime96_max =
*reinterpret_cast<const ParquetInt96*>(encoded_max.data());
int64_t micros_max = datetime96_max.to_timestamp_micros();
// From Trino: Parquet INT96 timestamp values were compared incorrectly
// for the purposes of producing statistics by older parquet writers,
// so PARQUET-1065 deprecated them. The result is that any writer that produced stats
// was producing unusable incorrect values, except the special case where min == max
// and an incorrect ordering would not be material to the result.
// PARQUET-1026 made binary stats available and valid in that special case.
if (micros_min != micros_max) {
return false;
}
if constexpr (std::is_same_v<CppType, VecDateTimeValue> ||
std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
min_value.from_unixtime(micros_min / 1000000, ctz);
max_value.from_unixtime(micros_max / 1000000, ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
min_value.set_microsecond(micros_min % 1000000);
max_value.set_microsecond(micros_max % 1000000);
}
} else {
return false;
}
} else if (physical_type == tparquet::Type::INT64) {
int64_t date_value_min = *reinterpret_cast<const int64_t*>(encoded_min.data());
int64_t date_value_max = *reinterpret_cast<const int64_t*>(encoded_max.data());
int64_t second_mask = 1;
int64_t scale_to_nano_factor = 1;
cctz::time_zone resolved_ctz = ctz;
const auto& schema = col_schema->parquet_schema;
if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
resolved_ctz = cctz::utc_time_zone();
}
const auto& time_unit = timestamp_info.unit;
if (time_unit.__isset.MILLIS) {
second_mask = 1000;
scale_to_nano_factor = 1000000;
} else if (time_unit.__isset.MICROS) {
second_mask = 1000000;
scale_to_nano_factor = 1000;
} else if (time_unit.__isset.NANOS) {
second_mask = 1000000000;
scale_to_nano_factor = 1;
}
} else if (schema.__isset.converted_type) {
const auto& converted_type = schema.converted_type;
if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) {
second_mask = 1000;
scale_to_nano_factor = 1000000;
} else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) {
second_mask = 1000000;
scale_to_nano_factor = 1000;
}
}
if constexpr (std::is_same_v<CppType, VecDateTimeValue> ||
std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
min_value.from_unixtime(date_value_min / second_mask, resolved_ctz);
max_value.from_unixtime(date_value_max / second_mask, resolved_ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
min_value.set_microsecond((date_value_min % second_mask) *
scale_to_nano_factor / 1000);
max_value.set_microsecond((date_value_max % second_mask) *
scale_to_nano_factor / 1000);
}
} else {
return false;
}
} else {
return false;
}
break;
default:
return false;
}
return _filter_by_min_max(predicate.op, predicate_values, min_value, max_value);
}