cpp/src/parquet/geospatial/statistics.cc (296 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/geospatial/statistics.h"
#include <cmath>
#include <memory>
#include <optional>
#include "arrow/array.h"
#include "arrow/type.h"
#include "arrow/util/bit_run_reader.h"
#include "parquet/exception.h"
#include "parquet/geospatial/util_internal.h"
using arrow::util::SafeLoad;
namespace parquet::geospatial {
class GeoStatisticsImpl {
public:
bool Equals(const GeoStatisticsImpl& other) const {
return is_valid_ == other.is_valid_ &&
bounder_.GeometryTypes() == other.bounder_.GeometryTypes() &&
bounder_.Bounds() == other.bounder_.Bounds();
}
void Merge(const GeoStatisticsImpl& other) {
is_valid_ = is_valid_ && other.is_valid_;
if (!is_valid_) {
return;
}
// We don't yet support updating wraparound bounds. Rather than throw,
// we just mark the X bounds as invalid such that they are not used.
auto other_bounds = other.bounder_.Bounds();
if (is_wraparound_x() || other.is_wraparound_x()) {
other_bounds.min[0] = kNaN;
other_bounds.max[0] = kNaN;
}
bounder_.MergeBox(other_bounds);
std::vector<int32_t> other_geometry_types = other.bounder_.GeometryTypes();
bounder_.MergeGeometryTypes(other_geometry_types);
}
void Update(const ByteArray* values, int64_t num_values) {
if (!is_valid_) {
return;
}
if (is_wraparound_x()) {
throw ParquetException("Wraparound X is not suppored by GeoStatistics::Update()");
}
for (int64_t i = 0; i < num_values; i++) {
const ByteArray& item = values[i];
try {
bounder_.MergeGeometry({reinterpret_cast<const char*>(item.ptr), item.len});
} catch (ParquetException&) {
is_valid_ = false;
return;
}
}
}
void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
int64_t valid_bits_offset, int64_t num_spaced_values,
int64_t num_values) {
DCHECK_GT(num_spaced_values, 0);
if (!is_valid_) {
return;
}
if (is_wraparound_x()) {
throw ParquetException("Wraparound X is not suppored by GeoStatistics::Update()");
}
::arrow::Status status = ::arrow::internal::VisitSetBitRuns(
valid_bits, valid_bits_offset, num_spaced_values,
[&](int64_t position, int64_t length) {
for (int64_t i = 0; i < length; i++) {
ByteArray item = SafeLoad(values + i + position);
PARQUET_CATCH_NOT_OK(bounder_.MergeGeometry(
{reinterpret_cast<const char*>(item.ptr), item.len}));
}
return ::arrow::Status::OK();
});
if (!status.ok()) {
is_valid_ = false;
}
}
void Update(const ::arrow::Array& values) {
if (!is_valid_) {
return;
}
if (is_wraparound_x()) {
throw ParquetException("Wraparound X is not suppored by GeoStatistics::Update()");
}
// Note that ::arrow::Type::EXTENSION seems to be handled before this is called
switch (values.type_id()) {
case ::arrow::Type::BINARY:
UpdateArrayImpl<::arrow::BinaryArray>(values);
break;
case ::arrow::Type::LARGE_BINARY:
UpdateArrayImpl<::arrow::LargeBinaryArray>(values);
break;
// This does not currently handle run-end encoded, dictionary encodings, or views
default:
throw ParquetException("Unsupported Array type in GeoStatistics::Update(Array): ",
values.type()->ToString());
}
}
void Reset() {
bounder_.Reset();
is_valid_ = true;
has_geometry_types_ = true;
}
EncodedGeoStatistics Encode() const {
if (!is_valid_) {
return {};
}
const geospatial::BoundingBox::XYZM& mins = bounder_.Bounds().min;
const geospatial::BoundingBox::XYZM& maxes = bounder_.Bounds().max;
EncodedGeoStatistics out;
if (has_geometry_types_) {
out.geospatial_types = bounder_.GeometryTypes();
}
bool write_x = !bound_empty(0) && bound_valid(0);
bool write_y = !bound_empty(1) && bound_valid(1);
bool write_z = write_x && write_y && !bound_empty(2) && bound_valid(2);
bool write_m = write_x && write_y && !bound_empty(3) && bound_valid(3);
if (write_x && write_y) {
out.xmin = mins[0];
out.xmax = maxes[0];
out.ymin = mins[1];
out.ymax = maxes[1];
out.xy_bounds_present = true;
}
if (write_z) {
out.zmin = mins[2];
out.zmax = maxes[2];
out.z_bounds_present = true;
}
if (write_m) {
out.mmin = mins[3];
out.mmax = maxes[3];
out.m_bounds_present = true;
}
return out;
}
void Decode(const EncodedGeoStatistics& encoded) {
Reset();
geospatial::BoundingBox box;
if (encoded.xy_bounds_present) {
box.min[0] = encoded.xmin;
box.max[0] = encoded.xmax;
box.min[1] = encoded.ymin;
box.max[1] = encoded.ymax;
} else {
box.min[0] = kNaN;
box.max[0] = kNaN;
box.min[1] = kNaN;
box.max[1] = kNaN;
}
if (encoded.z_bounds_present) {
box.min[2] = encoded.zmin;
box.max[2] = encoded.zmax;
} else {
box.min[2] = kNaN;
box.max[2] = kNaN;
}
if (encoded.m_bounds_present) {
box.min[3] = encoded.mmin;
box.max[3] = encoded.mmax;
} else {
box.min[3] = kNaN;
box.max[3] = kNaN;
}
bounder_.MergeBox(box);
bounder_.MergeGeometryTypes(encoded.geospatial_types);
has_geometry_types_ = has_geometry_types_ && encoded.geospatial_types_present();
}
bool is_wraparound_x() const {
return is_wraparound(lower_bound()[0], upper_bound()[0]);
}
bool is_valid() const { return is_valid_; }
bool bounds_empty() const {
for (int i = 0; i < kMaxDimensions; i++) {
if (!bound_empty(i)) {
return false;
}
}
return true;
}
bool bound_empty(int i) const {
return std::isinf(bounder_.Bounds().min[i] - bounder_.Bounds().max[i]);
}
bool bound_valid(int i) const {
return !std::isnan(bounder_.Bounds().min[i]) && !std::isnan(bounder_.Bounds().max[i]);
}
const std::array<double, kMaxDimensions>& lower_bound() const {
return bounder_.Bounds().min;
}
const std::array<double, kMaxDimensions>& upper_bound() const {
return bounder_.Bounds().max;
}
std::optional<std::vector<int32_t>> geometry_types() const {
if (has_geometry_types_) {
return bounder_.GeometryTypes();
} else {
return std::nullopt;
}
}
private:
geospatial::WKBGeometryBounder bounder_;
bool is_valid_ = true;
bool has_geometry_types_ = true;
template <typename ArrayType>
void UpdateArrayImpl(const ::arrow::Array& values) {
const auto& binary_array = static_cast<const ArrayType&>(values);
for (int64_t i = 0; i < binary_array.length(); ++i) {
if (!binary_array.IsNull(i)) {
try {
bounder_.MergeGeometry(binary_array.GetView(i));
} catch (ParquetException&) {
is_valid_ = false;
return;
}
}
}
}
// The Parquet specification allows X bounds to be "wraparound" to allow for
// more compact bounding boxes when a geometry happens to include components
// on both sides of the antimeridian (e.g., the nation of Fiji). This function
// checks for that case (see GeoStatistics::lower_bound/upper_bound for more
// details).
static bool is_wraparound(double xmin, double xmax) {
return !std::isinf(xmin - xmax) && xmin > xmax;
}
};
GeoStatistics::GeoStatistics() : impl_(std::make_unique<GeoStatisticsImpl>()) {}
GeoStatistics::GeoStatistics(const EncodedGeoStatistics& encoded) : GeoStatistics() {
Decode(encoded);
}
GeoStatistics::~GeoStatistics() = default;
bool GeoStatistics::Equals(const GeoStatistics& other) const {
return impl_->Equals(*other.impl_);
}
void GeoStatistics::Merge(const GeoStatistics& other) { impl_->Merge(*other.impl_); }
void GeoStatistics::Update(const ByteArray* values, int64_t num_values) {
impl_->Update(values, num_values);
}
void GeoStatistics::UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
int64_t valid_bits_offset, int64_t num_spaced_values,
int64_t num_values) {
impl_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values,
num_values);
}
void GeoStatistics::Update(const ::arrow::Array& values) { impl_->Update(values); }
void GeoStatistics::Reset() { impl_->Reset(); }
bool GeoStatistics::is_valid() const { return impl_->is_valid(); }
std::optional<EncodedGeoStatistics> GeoStatistics::Encode() const {
if (is_valid()) {
return impl_->Encode();
} else {
return std::nullopt;
}
}
void GeoStatistics::Decode(const EncodedGeoStatistics& encoded) {
impl_->Decode(encoded);
}
std::array<double, 4> GeoStatistics::lower_bound() const { return impl_->lower_bound(); }
std::array<double, 4> GeoStatistics::upper_bound() const { return impl_->upper_bound(); }
std::array<bool, 4> GeoStatistics::dimension_empty() const {
return {impl_->bound_empty(0), impl_->bound_empty(1), impl_->bound_empty(2),
impl_->bound_empty(3)};
}
std::array<bool, 4> GeoStatistics::dimension_valid() const {
return {impl_->bound_valid(0), impl_->bound_valid(1), impl_->bound_valid(2),
impl_->bound_valid(3)};
}
std::optional<std::vector<int32_t>> GeoStatistics::geometry_types() const {
return impl_->geometry_types();
}
std::string GeoStatistics::ToString() const {
if (!is_valid()) {
return "<GeoStatistics> invalid";
}
std::stringstream ss;
ss << "<GeoStatistics>";
std::string dim_label("xyzm");
auto dim_valid = dimension_valid();
auto dim_empty = dimension_empty();
auto lower = lower_bound();
auto upper = upper_bound();
for (int i = 0; i < kMaxDimensions; i++) {
ss << " " << dim_label[i] << ": ";
if (!dim_valid[i]) {
ss << "invalid";
} else if (dim_empty[i]) {
ss << "empty";
} else {
ss << "[" << lower[i] << ", " << upper[i] << "]";
}
}
std::optional<std::vector<int32_t>> maybe_geometry_types = geometry_types();
ss << " geometry_types: ";
if (maybe_geometry_types.has_value()) {
ss << "[";
std::string sep("");
for (int32_t geometry_type : *maybe_geometry_types) {
ss << sep << geometry_type;
sep = ", ";
}
ss << "]";
} else {
ss << "invalid";
}
return ss.str();
}
} // namespace parquet::geospatial