table/evaluators.go (911 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package table import ( "fmt" "math" "slices" "github.com/apache/arrow-go/v18/parquet/metadata" "github.com/apache/iceberg-go" "github.com/google/uuid" ) const ( rowsMightMatch, rowsMustMatch = true, true rowsCannotMatch, rowsMightNotMatch = false, false inPredicateLimit = 200 ) // newManifestEvaluator returns a function that can be used to evaluate whether a particular // manifest file has rows that might or might not match a given partition filter by using // the stats provided in the partitions (UpperBound/LowerBound/ContainsNull/ContainsNaN). func newManifestEvaluator(spec iceberg.PartitionSpec, schema *iceberg.Schema, partitionFilter iceberg.BooleanExpression, caseSensitive bool) (func(iceberg.ManifestFile) (bool, error), error) { partType := spec.PartitionType(schema) partSchema := iceberg.NewSchema(0, partType.FieldList...) filter, err := iceberg.RewriteNotExpr(partitionFilter) if err != nil { return nil, err } boundFilter, err := iceberg.BindExpr(partSchema, filter, caseSensitive) if err != nil { return nil, err } return (&manifestEvalVisitor{partitionFilter: boundFilter}).Eval, nil } type manifestEvalVisitor struct { partitionFields []iceberg.FieldSummary partitionFilter iceberg.BooleanExpression } func (m *manifestEvalVisitor) Eval(manifest iceberg.ManifestFile) (bool, error) { if parts := manifest.Partitions(); len(parts) > 0 { m.partitionFields = parts return iceberg.VisitExpr(m.partitionFilter, m) } return rowsMightMatch, nil } func removeBoundCmp[T iceberg.LiteralType](bound iceberg.Literal, vals []iceberg.Literal, cmpToDelete int) []iceberg.Literal { val := bound.(iceberg.TypedLiteral[T]) cmp := val.Comparator() return slices.DeleteFunc(vals, func(v iceberg.Literal) bool { return cmp(val.Value(), v.(iceberg.TypedLiteral[T]).Value()) == cmpToDelete }) } func removeBoundCheck(bound iceberg.Literal, vals []iceberg.Literal, toDelete int) []iceberg.Literal { switch bound.Type().(type) { case iceberg.BooleanType: return removeBoundCmp[bool](bound, vals, toDelete) case iceberg.Int32Type: return removeBoundCmp[int32](bound, vals, toDelete) case iceberg.Int64Type: return removeBoundCmp[int64](bound, vals, toDelete) case iceberg.Float32Type: return removeBoundCmp[float32](bound, vals, toDelete) case iceberg.Float64Type: return removeBoundCmp[float64](bound, vals, toDelete) case iceberg.DateType: return removeBoundCmp[iceberg.Date](bound, vals, toDelete) case iceberg.TimeType: return removeBoundCmp[iceberg.Time](bound, vals, toDelete) case iceberg.TimestampType, iceberg.TimestampTzType: return removeBoundCmp[iceberg.Timestamp](bound, vals, toDelete) case iceberg.BinaryType, iceberg.FixedType: return removeBoundCmp[[]byte](bound, vals, toDelete) case iceberg.StringType: return removeBoundCmp[string](bound, vals, toDelete) case iceberg.UUIDType: return removeBoundCmp[uuid.UUID](bound, vals, toDelete) case iceberg.DecimalType: return removeBoundCmp[iceberg.Decimal](bound, vals, toDelete) } panic("unrecognized type") } func allBoundCmp[T iceberg.LiteralType](bound iceberg.Literal, set iceberg.Set[iceberg.Literal], want int) bool { val := bound.(iceberg.TypedLiteral[T]) cmp := val.Comparator() return set.All(func(e iceberg.Literal) bool { return cmp(val.Value(), e.(iceberg.TypedLiteral[T]).Value()) == want }) } func allBoundCheck(bound iceberg.Literal, set iceberg.Set[iceberg.Literal], want int) bool { switch bound.Type().(type) { case iceberg.BooleanType: return allBoundCmp[bool](bound, set, want) case iceberg.Int32Type: return allBoundCmp[int32](bound, set, want) case iceberg.Int64Type: return allBoundCmp[int64](bound, set, want) case iceberg.Float32Type: return allBoundCmp[float32](bound, set, want) case iceberg.Float64Type: return allBoundCmp[float64](bound, set, want) case iceberg.DateType: return allBoundCmp[iceberg.Date](bound, set, want) case iceberg.TimeType: return allBoundCmp[iceberg.Time](bound, set, want) case iceberg.TimestampType, iceberg.TimestampTzType: return allBoundCmp[iceberg.Timestamp](bound, set, want) case iceberg.BinaryType, iceberg.FixedType: return allBoundCmp[[]byte](bound, set, want) case iceberg.StringType: return allBoundCmp[string](bound, set, want) case iceberg.UUIDType: return allBoundCmp[uuid.UUID](bound, set, want) case iceberg.DecimalType: return allBoundCmp[iceberg.Decimal](bound, set, want) } panic(iceberg.ErrType) } func (m *manifestEvalVisitor) VisitIn(term iceberg.BoundTerm, literals iceberg.Set[iceberg.Literal]) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.LowerBound == nil { return rowsCannotMatch } if literals.Len() > inPredicateLimit { return rowsMightMatch } lower, err := iceberg.LiteralFromBytes(term.Type(), *field.LowerBound) if err != nil { panic(err) } if allBoundCheck(lower, literals, 1) { return rowsCannotMatch } if field.UpperBound != nil { upper, err := iceberg.LiteralFromBytes(term.Type(), *field.UpperBound) if err != nil { panic(err) } if allBoundCheck(upper, literals, -1) { return rowsCannotMatch } } return rowsMightMatch } func (m *manifestEvalVisitor) VisitNotIn(term iceberg.BoundTerm, literals iceberg.Set[iceberg.Literal]) bool { // because the bounds are not necessarily a min or max value, this cannot be answered using them // notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col return rowsMightMatch } func (m *manifestEvalVisitor) VisitIsNan(term iceberg.BoundTerm) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.ContainsNaN != nil && !*field.ContainsNaN { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitNotNan(term iceberg.BoundTerm) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.ContainsNaN != nil && *field.ContainsNaN && !field.ContainsNull && field.LowerBound == nil { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitIsNull(term iceberg.BoundTerm) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if !field.ContainsNull { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitNotNull(term iceberg.BoundTerm) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] // ContainsNull encodes whether at least one partition value is null // lowerBound is null if all partition values are null allNull := field.ContainsNull && field.LowerBound == nil if allNull && (term.Ref().Type().Equals(iceberg.PrimitiveTypes.Float32) || term.Ref().Type().Equals(iceberg.PrimitiveTypes.Float64)) { // floating point types may include NaN values, which we check separately // in case bounds don't include NaN values, ContainsNaN needsz to be checked allNull = field.ContainsNaN != nil && !*field.ContainsNaN } if allNull { return rowsCannotMatch } return rowsMightMatch } func getCmp[T iceberg.LiteralType](b iceberg.TypedLiteral[T]) func(iceberg.Literal, iceberg.Literal) int { cmp := b.Comparator() return func(l1, l2 iceberg.Literal) int { return cmp(l1.(iceberg.TypedLiteral[T]).Value(), l2.(iceberg.TypedLiteral[T]).Value()) } } func getCmpLiteral(boundary iceberg.Literal) func(iceberg.Literal, iceberg.Literal) int { switch l := boundary.(type) { case iceberg.TypedLiteral[bool]: return getCmp(l) case iceberg.TypedLiteral[int32]: return getCmp(l) case iceberg.TypedLiteral[int64]: return getCmp(l) case iceberg.TypedLiteral[float32]: return getCmp(l) case iceberg.TypedLiteral[float64]: return getCmp(l) case iceberg.TypedLiteral[iceberg.Date]: return getCmp(l) case iceberg.TypedLiteral[iceberg.Time]: return getCmp(l) case iceberg.TypedLiteral[iceberg.Timestamp]: return getCmp(l) case iceberg.TypedLiteral[[]byte]: return getCmp(l) case iceberg.TypedLiteral[string]: return getCmp(l) case iceberg.TypedLiteral[uuid.UUID]: return getCmp(l) case iceberg.TypedLiteral[iceberg.Decimal]: return getCmp(l) } panic(iceberg.ErrType) } func (m *manifestEvalVisitor) VisitEqual(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.LowerBound == nil || field.UpperBound == nil { // values are all null and literal cannot contain null return rowsCannotMatch } lower, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.LowerBound) if err != nil { panic(err) } cmp := getCmpLiteral(lower) if cmp(lower, lit) == 1 { return rowsCannotMatch } upper, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.UpperBound) if err != nil { panic(err) } if cmp(lit, upper) == 1 { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitNotEqual(term iceberg.BoundTerm, lit iceberg.Literal) bool { // because bounds are not necessarily a min or max, this cannot be answered // using them. notEq(col, X) with (X, Y) doesn't guarantee X is a value in col return rowsMightMatch } func (m *manifestEvalVisitor) VisitGreaterEqual(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.UpperBound == nil { return rowsCannotMatch } upper, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.UpperBound) if err != nil { panic(err) } if getCmpLiteral(upper)(lit, upper) == 1 { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitGreater(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.UpperBound == nil { return rowsCannotMatch } upper, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.UpperBound) if err != nil { panic(err) } if getCmpLiteral(upper)(lit, upper) >= 0 { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitLessEqual(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.LowerBound == nil { return rowsCannotMatch } lower, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.LowerBound) if err != nil { panic(err) } if getCmpLiteral(lower)(lit, lower) == -1 { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitLess(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.LowerBound == nil { return rowsCannotMatch } lower, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.LowerBound) if err != nil { panic(err) } if getCmpLiteral(lower)(lit, lower) <= 0 { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitStartsWith(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] var prefix string if val, ok := lit.(iceberg.TypedLiteral[string]); ok { prefix = val.Value() } else { prefix = string(lit.(iceberg.TypedLiteral[[]byte]).Value()) } lenPrefix := len(prefix) if field.LowerBound == nil { return rowsCannotMatch } lower, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.LowerBound) if err != nil { panic(err) } // truncate lower bound so that it's length is not greater than the length of prefix var v string switch l := lower.(type) { case iceberg.TypedLiteral[string]: v = l.Value() if len(v) > lenPrefix { v = v[:lenPrefix] } case iceberg.TypedLiteral[[]byte]: v = string(l.Value()) if len(v) > lenPrefix { v = v[:lenPrefix] } } if v > prefix { return rowsCannotMatch } if field.UpperBound == nil { return rowsCannotMatch } upper, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.UpperBound) if err != nil { panic(err) } switch u := upper.(type) { case iceberg.TypedLiteral[string]: v = u.Value() if len(v) > lenPrefix { v = v[:lenPrefix] } case iceberg.TypedLiteral[[]byte]: v = string(u.Value()) if len(v) > lenPrefix { v = v[:lenPrefix] } } if v < prefix { return rowsCannotMatch } return rowsMightMatch } func (m *manifestEvalVisitor) VisitNotStartsWith(term iceberg.BoundTerm, lit iceberg.Literal) bool { pos := term.Ref().Pos() field := m.partitionFields[pos] if field.ContainsNull || field.LowerBound == nil || field.UpperBound == nil { return rowsMightMatch } // NotStartsWith will match unless ALL values must start with the prefix. // this happens when the lower and upper bounds BOTH start with the prefix lower, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.LowerBound) if err != nil { panic(err) } upper, err := iceberg.LiteralFromBytes(term.Ref().Type(), *field.UpperBound) if err != nil { panic(err) } var prefix, lowerBound, upperBound string if val, ok := lit.(iceberg.TypedLiteral[string]); ok { prefix = val.Value() lowerBound, upperBound = lower.(iceberg.TypedLiteral[string]).Value(), upper.(iceberg.TypedLiteral[string]).Value() } else { prefix = string(lit.(iceberg.TypedLiteral[[]byte]).Value()) lowerBound = string(lower.(iceberg.TypedLiteral[[]byte]).Value()) upperBound = string(upper.(iceberg.TypedLiteral[[]byte]).Value()) } lenPrefix := len(prefix) if len(lowerBound) < lenPrefix { return rowsMightMatch } if lowerBound[:lenPrefix] == prefix { // if upper is shorter then upper can't start with the prefix if len(upperBound) < lenPrefix { return rowsMightMatch } if upperBound[:lenPrefix] == prefix { return rowsCannotMatch } } return rowsMightMatch } func (m *manifestEvalVisitor) VisitTrue() bool { return rowsMightMatch } func (m *manifestEvalVisitor) VisitFalse() bool { return rowsCannotMatch } func (m *manifestEvalVisitor) VisitUnbound(iceberg.UnboundPredicate) bool { panic("need bound predicate") } func (m *manifestEvalVisitor) VisitBound(pred iceberg.BoundPredicate) bool { return iceberg.VisitBoundPredicate(pred, m) } func (m *manifestEvalVisitor) VisitNot(child bool) bool { return !child } func (m *manifestEvalVisitor) VisitAnd(left, right bool) bool { return left && right } func (m *manifestEvalVisitor) VisitOr(left, right bool) bool { return left || right } type projectionEvaluator struct { spec iceberg.PartitionSpec schema *iceberg.Schema caseSensitive bool } func (*projectionEvaluator) VisitTrue() iceberg.BooleanExpression { return iceberg.AlwaysTrue{} } func (*projectionEvaluator) VisitFalse() iceberg.BooleanExpression { return iceberg.AlwaysFalse{} } func (*projectionEvaluator) VisitNot(child iceberg.BooleanExpression) iceberg.BooleanExpression { panic(fmt.Errorf("%w: cannot project 'not' expression, should be rewritten %s", iceberg.ErrInvalidArgument, child)) } func (*projectionEvaluator) VisitAnd(left, right iceberg.BooleanExpression) iceberg.BooleanExpression { return iceberg.NewAnd(left, right) } func (*projectionEvaluator) VisitOr(left, right iceberg.BooleanExpression) iceberg.BooleanExpression { return iceberg.NewOr(left, right) } func (*projectionEvaluator) VisitUnbound(pred iceberg.UnboundPredicate) iceberg.BooleanExpression { panic(fmt.Errorf("%w: cannot project unbound predicate: %s", iceberg.ErrInvalidArgument, pred)) } type inclusiveProjection struct{ projectionEvaluator } func (p *inclusiveProjection) Project(expr iceberg.BooleanExpression) (iceberg.BooleanExpression, error) { expr, err := iceberg.RewriteNotExpr(expr) if err != nil { return nil, err } bound, err := iceberg.BindExpr(p.schema, expr, p.caseSensitive) if err != nil { return nil, err } return iceberg.VisitExpr(bound, p) } func (p *inclusiveProjection) VisitBound(pred iceberg.BoundPredicate) iceberg.BooleanExpression { parts := p.spec.FieldsBySourceID(pred.Term().Ref().Field().ID) var result iceberg.BooleanExpression = iceberg.AlwaysTrue{} for _, part := range parts { // consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d) // projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0 // any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01' // // similarly, if partitioning by day(ts) and hour(ts), the more restrictive // projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and // hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02. inclProjection, err := part.Transform.Project(part.Name, pred) if err != nil { panic(err) } if inclProjection != nil { result = iceberg.NewAnd(result, inclProjection) } } return result } func newInclusiveProjection(s *iceberg.Schema, spec iceberg.PartitionSpec, caseSensitive bool) func(iceberg.BooleanExpression) (iceberg.BooleanExpression, error) { return (&inclusiveProjection{ projectionEvaluator: projectionEvaluator{ schema: s, spec: spec, caseSensitive: caseSensitive, }, }).Project } type metricsEvaluator struct { valueCounts map[int]int64 nullCounts map[int]int64 nanCounts map[int]int64 lowerBounds map[int][]byte upperBounds map[int][]byte } func (m *metricsEvaluator) VisitTrue() bool { return rowsMightMatch } func (m *metricsEvaluator) VisitFalse() bool { return rowsCannotMatch } func (m *metricsEvaluator) VisitNot(child bool) bool { panic(fmt.Errorf("%w: NOT should be rewritten %v", iceberg.ErrInvalidArgument, child)) } func (m *metricsEvaluator) VisitAnd(left, right bool) bool { return left && right } func (m *metricsEvaluator) VisitOr(left, right bool) bool { return left || right } func (m *metricsEvaluator) containsNullsOnly(id int) bool { valCount, ok := m.valueCounts[id] if !ok { return false } nullCount, ok := m.nullCounts[id] if !ok { return false } return valCount == nullCount } func (m *metricsEvaluator) containsNansOnly(id int) bool { nanCount, ok := m.nanCounts[id] if !ok { return false } valCount, ok := m.valueCounts[id] if !ok { return false } return nanCount == valCount } func (m *metricsEvaluator) isNan(v iceberg.Literal) bool { switch v := v.(type) { case iceberg.Float32Literal: return math.IsNaN(float64(v)) case iceberg.Float64Literal: return math.IsNaN(float64(v)) default: return false } } func newInclusiveMetricsEvaluator(s *iceberg.Schema, expr iceberg.BooleanExpression, caseSensitive bool, includeEmptyFiles bool, ) (func(iceberg.DataFile) (bool, error), error) { rewritten, err := iceberg.RewriteNotExpr(expr) if err != nil { return nil, err } bound, err := iceberg.BindExpr(s, rewritten, caseSensitive) if err != nil { return nil, err } return (&inclusiveMetricsEval{ st: s.AsStruct(), includeEmptyFiles: includeEmptyFiles, expr: bound, }).Eval, nil } func newParquetRowGroupStatsEvaluator(fileSchema *iceberg.Schema, expr iceberg.BooleanExpression, includeEmptyFiles bool, ) (func(*metadata.RowGroupMetaData, []int) (bool, error), error) { rewritten, err := iceberg.RewriteNotExpr(expr) if err != nil { return nil, err } return (&inclusiveMetricsEval{ st: fileSchema.AsStruct(), includeEmptyFiles: includeEmptyFiles, expr: rewritten, }).TestRowGroup, nil } type inclusiveMetricsEval struct { metricsEvaluator st iceberg.StructType expr iceberg.BooleanExpression includeEmptyFiles bool } func (m *inclusiveMetricsEval) TestRowGroup(rgmeta *metadata.RowGroupMetaData, colIndices []int) (bool, error) { if !m.includeEmptyFiles && rgmeta.NumRows() == 0 { return rowsCannotMatch, nil } m.valueCounts = make(map[int]int64) m.nullCounts = make(map[int]int64) m.nanCounts = nil m.lowerBounds = make(map[int][]byte) m.upperBounds = make(map[int][]byte) for _, c := range colIndices { colMeta, err := rgmeta.ColumnChunk(c) if err != nil { return false, err } if ok, err := colMeta.StatsSet(); !ok || err != nil { continue } stats, err := colMeta.Statistics() if err != nil { return false, err } fieldID := int(stats.Descr().SchemaNode().FieldID()) m.valueCounts[fieldID] = stats.NumValues() if stats.HasNullCount() { m.nullCounts[fieldID] = stats.NullCount() } if stats.HasMinMax() { m.lowerBounds[fieldID] = stats.EncodeMin() m.upperBounds[fieldID] = stats.EncodeMax() } } return iceberg.VisitExpr(m.expr, m) } func (m *inclusiveMetricsEval) Eval(file iceberg.DataFile) (bool, error) { if !m.includeEmptyFiles && file.Count() == 0 { return rowsCannotMatch, nil } m.valueCounts, m.nullCounts = file.ValueCounts(), file.NullValueCounts() m.nanCounts = file.NaNValueCounts() m.lowerBounds, m.upperBounds = file.LowerBoundValues(), file.UpperBoundValues() return iceberg.VisitExpr(m.expr, m) } func (m *inclusiveMetricsEval) mayContainNull(fieldID int) bool { if m.nullCounts == nil { return true } _, ok := m.nullCounts[fieldID] return ok } func (m *inclusiveMetricsEval) VisitUnbound(iceberg.UnboundPredicate) bool { panic("need bound predicate") } func (m *inclusiveMetricsEval) VisitBound(pred iceberg.BoundPredicate) bool { return iceberg.VisitBoundPredicate(pred, m) } func (m *inclusiveMetricsEval) VisitIsNull(t iceberg.BoundTerm) bool { fieldID := t.Ref().Field().ID if cnt, exists := m.nullCounts[fieldID]; exists && cnt == 0 { return rowsCannotMatch } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitNotNull(t iceberg.BoundTerm) bool { // no need to check whether the field is required because binding evaluates // that case if the column has no non-null values, the expression cannot match fieldID := t.Ref().Field().ID if m.containsNullsOnly(fieldID) { return rowsCannotMatch } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitIsNan(t iceberg.BoundTerm) bool { fieldID := t.Ref().Field().ID if cnt, exists := m.nanCounts[fieldID]; exists && cnt == 0 { return rowsCannotMatch } // when there's no nancounts information but we already know the column // contains null it's guaranteed that there's no nan value if m.containsNullsOnly(fieldID) { return rowsCannotMatch } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitNotNan(t iceberg.BoundTerm) bool { fieldID := t.Ref().Field().ID if m.containsNansOnly(fieldID) { return rowsCannotMatch } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitLess(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) { return rowsCannotMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) if err != nil { panic(err) } if m.isNan(lowerBound) { // nan indicates unreliable bounds return rowsMightMatch } if getCmpLiteral(lowerBound)(lowerBound, lit) >= 0 { return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitLessEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) { return rowsCannotMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) if err != nil { panic(err) } if m.isNan(lowerBound) { // nan indicates unreliable bounds return rowsMightMatch } if getCmpLiteral(lowerBound)(lowerBound, lit) > 0 { return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitGreater(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) { return rowsCannotMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) if err != nil { panic(err) } if getCmpLiteral(upperBound)(upperBound, lit) <= 0 { if m.isNan(upperBound) { return rowsMightMatch } return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitGreaterEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) { return rowsCannotMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) if err != nil { panic(err) } if getCmpLiteral(upperBound)(upperBound, lit) < 0 { if m.isNan(upperBound) { return rowsMightMatch } return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) { return rowsCannotMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } var cmp func(iceberg.Literal, iceberg.Literal) int if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) if err != nil { panic(err) } if m.isNan(lowerBound) { return rowsMightMatch } cmp = getCmpLiteral(lowerBound) if cmp(lowerBound, lit) == 1 { return rowsCannotMatch } } if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) if err != nil { panic(err) } if m.isNan(upperBound) { return rowsMightMatch } if cmp(upperBound, lit) == -1 { return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitNotEqual(iceberg.BoundTerm, iceberg.Literal) bool { return rowsMightMatch } func (m *inclusiveMetricsEval) VisitIn(t iceberg.BoundTerm, s iceberg.Set[iceberg.Literal]) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) { return rowsCannotMatch } if s.Len() > inPredicateLimit { // skip evaluating the predicate if the number of values is too big return rowsMightMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } values := s.Members() if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) if err != nil { panic(lowerBound) } if m.isNan(lowerBound) { return rowsMightMatch } values = removeBoundCheck(lowerBound, values, 1) if len(values) == 0 { return rowsCannotMatch } } if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) if err != nil { panic(err) } if m.isNan(upperBound) { return rowsMightMatch } values = removeBoundCheck(upperBound, values, -1) if len(values) == 0 { return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitNotIn(iceberg.BoundTerm, iceberg.Set[iceberg.Literal]) bool { // because the bounds are not necessarily a min or max value, this cannot be // answered using them. notIn(col, {X, ...}) with (XX, Y) doesn't guarantee that // X is a value in col return rowsMightMatch } func (m *inclusiveMetricsEval) VisitStartsWith(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.containsNullsOnly(fieldID) { return rowsCannotMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } var prefix string if val, ok := lit.(iceberg.TypedLiteral[string]); ok { prefix = val.Value() } else { prefix = string(lit.(iceberg.TypedLiteral[[]byte]).Value()) } lenPrefix := len(prefix) if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) if err != nil { panic(err) } var v string switch l := lowerBound.(type) { case iceberg.TypedLiteral[string]: v = l.Value() case iceberg.TypedLiteral[[]byte]: v = string(l.Value()) } if len(v) > lenPrefix { v = v[:lenPrefix] } if len(v) > 0 && v > prefix { return rowsCannotMatch } } if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) if err != nil { panic(err) } var v string switch u := upperBound.(type) { case iceberg.TypedLiteral[string]: v = u.Value() case iceberg.TypedLiteral[[]byte]: v = string(u.Value()) } if len(v) > lenPrefix { v = v[:lenPrefix] } if len(v) > 0 && v < prefix { return rowsCannotMatch } } return rowsMightMatch } func (m *inclusiveMetricsEval) VisitNotStartsWith(t iceberg.BoundTerm, lit iceberg.Literal) bool { field := t.Ref().Field() fieldID := field.ID if m.mayContainNull(fieldID) { return rowsMightMatch } if _, ok := field.Type.(iceberg.PrimitiveType); !ok { panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s", iceberg.ErrInvalidTypeString, field.Type)) } // not_starts_with will match unless all values must start with the prefix. // this happens when the lower and upper bounds both start with the prefix lowerBoundBytes, upperBoundBytes := m.lowerBounds[fieldID], m.upperBounds[fieldID] if lowerBoundBytes != nil && upperBoundBytes != nil { lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) if err != nil { panic(err) } upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) if err != nil { panic(err) } var prefix, lower, upper string if val, ok := lit.(iceberg.TypedLiteral[string]); ok { prefix = val.Value() lower, upper = lowerBound.(iceberg.TypedLiteral[string]).Value(), upperBound.(iceberg.TypedLiteral[string]).Value() } else { prefix = string(lit.(iceberg.TypedLiteral[[]byte]).Value()) lower, upper = string(lowerBound.(iceberg.TypedLiteral[[]byte]).Value()), string(upperBound.(iceberg.TypedLiteral[[]byte]).Value()) } lenPrefix := len(prefix) if len(lower) < lenPrefix { return rowsMightMatch } if lower[:lenPrefix] == prefix { if len(upper) < lenPrefix { return rowsMightMatch } if upper[:lenPrefix] == prefix { return rowsCannotMatch } } } return rowsMightMatch }