table/sorting.go (145 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"encoding/json"
"errors"
"fmt"
"slices"
"strings"
"github.com/apache/iceberg-go"
)
type SortDirection string
const (
SortASC SortDirection = "asc"
SortDESC SortDirection = "desc"
)
type NullOrder string
const (
NullsFirst NullOrder = "nulls-first"
NullsLast NullOrder = "nulls-last"
)
var (
ErrInvalidSortDirection = errors.New("invalid sort direction, must be 'asc' or 'desc'")
ErrInvalidNullOrder = errors.New("invalid null order, must be 'nulls-first' or 'nulls-last'")
)
// SortField describes a field used in a sort order definition.
type SortField struct {
// SourceID is the source column id from the table's schema
SourceID int `json:"source-id"`
// Transform is the tranformation used to produce values to be
// sorted on from the source column.
Transform iceberg.Transform `json:"transform"`
// Direction is an enum indicating ascending or descending direction.
Direction SortDirection `json:"direction"`
// NullOrder describes the order of null values when sorting
// should be only either nulls-first or nulls-last enum values.
NullOrder NullOrder `json:"null-order"`
}
func (s *SortField) String() string {
if _, ok := s.Transform.(iceberg.IdentityTransform); ok {
return fmt.Sprintf("%d %s %s", s.SourceID, s.Direction, s.NullOrder)
}
return fmt.Sprintf("%s(%d) %s %s", s.Transform, s.SourceID, s.Direction, s.NullOrder)
}
func (s *SortField) MarshalJSON() ([]byte, error) {
if s.Direction == "" {
s.Direction = SortASC
}
if s.NullOrder == "" {
if s.Direction == SortASC {
s.NullOrder = NullsFirst
} else {
s.NullOrder = NullsLast
}
}
type Alias SortField
return json.Marshal((*Alias)(s))
}
func (s *SortField) UnmarshalJSON(b []byte) error {
type Alias SortField
aux := struct {
TransformString string `json:"transform"`
*Alias
}{
Alias: (*Alias)(s),
}
err := json.Unmarshal(b, &aux)
if err != nil {
return err
}
if s.Transform, err = iceberg.ParseTransform(aux.TransformString); err != nil {
return err
}
switch s.Direction {
case SortASC, SortDESC:
default:
return ErrInvalidSortDirection
}
switch s.NullOrder {
case NullsFirst, NullsLast:
default:
return ErrInvalidNullOrder
}
return nil
}
const (
InitialSortOrderID = 1
UnsortedSortOrderID = 0
)
// A default Sort Order indicating no sort order at all
var UnsortedSortOrder = SortOrder{OrderID: UnsortedSortOrderID, Fields: []SortField{}}
// SortOrder describes how the data is sorted within the table.
//
// Data can be sorted within partitions by columns to gain performance. The
// order of the sort fields within the list defines the order in which the
// sort is applied to the data.
type SortOrder struct {
OrderID int `json:"order-id"`
Fields []SortField `json:"fields"`
}
func (s SortOrder) Equals(rhs SortOrder) bool {
return s.OrderID == rhs.OrderID &&
slices.Equal(s.Fields, rhs.Fields)
}
func (s SortOrder) String() string {
var b strings.Builder
fmt.Fprintf(&b, "%d: ", s.OrderID)
b.WriteByte('[')
for i, f := range s.Fields {
if i == 0 {
b.WriteByte('\n')
}
b.WriteString(f.String())
b.WriteByte('\n')
}
b.WriteByte(']')
return b.String()
}
func (s *SortOrder) UnmarshalJSON(b []byte) error {
type Alias SortOrder
aux := (*Alias)(s)
if err := json.Unmarshal(b, aux); err != nil {
return err
}
if len(s.Fields) == 0 {
s.Fields = []SortField{}
s.OrderID = 0
return nil
}
if s.OrderID == 0 {
s.OrderID = InitialSortOrderID // initialize default sort order id
}
return nil
}
// AssignFreshSortOrderIDs updates and reassigns the field source IDs from the old schema
// to the corresponding fields in the fresh schema, while also giving the Sort Order a fresh
// ID of 0 (the initial Sort Order ID).
func AssignFreshSortOrderIDs(sortOrder SortOrder, old, fresh *iceberg.Schema) (SortOrder, error) {
return AssignFreshSortOrderIDsWithID(sortOrder, old, fresh, InitialSortOrderID)
}
// AssignFreshSortOrderIDsWithID is like AssignFreshSortOrderIDs but allows specifying the id of the
// returned SortOrder.
func AssignFreshSortOrderIDsWithID(sortOrder SortOrder, old, fresh *iceberg.Schema, sortOrderID int) (SortOrder, error) {
if sortOrder.Equals(UnsortedSortOrder) {
return UnsortedSortOrder, nil
}
fields := make([]SortField, 0, len(sortOrder.Fields))
for _, field := range sortOrder.Fields {
originalField, ok := old.FindColumnName(field.SourceID)
if !ok {
return SortOrder{}, fmt.Errorf("cannot find source column id %s in old schema", field.String())
}
freshField, ok := fresh.FindFieldByName(originalField)
if !ok {
return SortOrder{}, fmt.Errorf("cannot find field %s in fresh schema", originalField)
}
fields = append(fields, SortField{
SourceID: freshField.ID,
Transform: field.Transform,
Direction: field.Direction,
NullOrder: field.NullOrder,
})
}
return SortOrder{OrderID: sortOrderID, Fields: fields}, nil
}