parquet/pqarrow/variant.go (82 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pqarrow
import (
"fmt"
"reflect"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/parquet/schema"
)
// variantArray is an experimental extension type, but is not yet fully supported.
type variantArray struct {
array.ExtensionArrayBase
}
// variantExtensionType is experimental extension type that supports
// semi-structured objects that can be composed of primitives, arrays, and
// objects which can be queried by path.
//
// Unshredded variant representation:
//
// optional group variant_name (VARIANT) {
// required binary metadata;
// required binary value;
// }
//
// To read more about variant encoding, see the variant encoding spec at
// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
//
// To read more about variant shredding, see the variant shredding spec at
// https://github.com/apache/parquet-format/blob/master/VariantShredding.md
type variantExtensionType struct {
arrow.ExtensionBase
// TODO: add shredded_value
metadata arrow.Field
value arrow.Field
}
func (*variantExtensionType) ParquetLogicalType() schema.LogicalType {
return schema.VariantLogicalType{}
}
func isBinaryField(f arrow.Field) bool {
return f.Type.ID() == arrow.BINARY || f.Type.ID() == arrow.LARGE_BINARY
}
func isSupportedVariantStorage(dt arrow.DataType) bool {
// for now we only support unshredded variants. unshredded vairant storage
// type should be a struct with a binary metadata and binary value.
//
// In shredded variants, the binary value field can be replaced
// with one or more of the following: object, array, typed_value, and variant_value.
s, ok := dt.(*arrow.StructType)
if !ok {
return false
}
if s.NumFields() != 2 {
return false
}
// ordering of metadata and value fields does not matter, as we will
// assign these to the variant extension type's members.
// here we just need to check that both are present.
metadataField, ok := s.FieldByName("metadata")
if !ok {
return false
}
valueField, ok := s.FieldByName("value")
if !ok {
return false
}
// both must be non-nullable binary types for unshredded variants for now
return isBinaryField(metadataField) && isBinaryField(valueField) &&
!metadataField.Nullable && !valueField.Nullable
}
// NOTE: this is still experimental, a future change will add shredding support.
func newVariantType(storageType arrow.DataType) (*variantExtensionType, error) {
if !isSupportedVariantStorage(storageType) {
return nil, fmt.Errorf("%w: invalid storage type for unshredded variant: %s",
arrow.ErrInvalid, storageType.String())
}
var (
mdField, valField arrow.Field
)
// shredded variants will eventually need to handle an optional shredded_value
// as well as value being optional
dt := storageType.(*arrow.StructType)
if dt.Field(0).Name == "metadata" {
mdField = dt.Field(0)
valField = dt.Field(1)
} else {
mdField = dt.Field(1)
valField = dt.Field(0)
}
return &variantExtensionType{
ExtensionBase: arrow.ExtensionBase{Storage: storageType},
metadata: mdField,
value: valField,
}, nil
}
func (v *variantExtensionType) Metadata() arrow.Field { return v.metadata }
func (v *variantExtensionType) Value() arrow.Field { return v.value }
func (*variantExtensionType) ArrayType() reflect.Type {
return reflect.TypeOf(variantArray{})
}
func (*variantExtensionType) ExtensionName() string {
return "parquet.variant"
}
func (v *variantExtensionType) String() string {
return fmt.Sprintf("extension<%s>", v.ExtensionName())
}
func (v *variantExtensionType) ExtensionEquals(other arrow.ExtensionType) bool {
return v.ExtensionName() == other.ExtensionName() &&
arrow.TypeEqual(v.Storage, other.StorageType())
}
func (*variantExtensionType) Serialize() string { return "" }
func (*variantExtensionType) Deserialize(storageType arrow.DataType, _ string) (arrow.ExtensionType, error) {
return newVariantType(storageType)
}