in table/arrow_utils.go [707:771]
func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals arrow.Array) arrow.Array {
fileField, ok := a.fileSchema.FindFieldByID(field.ID)
if !ok {
panic(fmt.Errorf("could not find field id %d in schema", field.ID))
}
typ, ok := fileField.Type.(iceberg.PrimitiveType)
if !ok {
vals.Retain()
return vals
}
if !field.Type.Equals(typ) {
promoted := retOrPanic(iceberg.PromoteType(fileField.Type, field.Type))
targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs, a.useLargeTypes))
if !a.useLargeTypes {
targetType = retOrPanic(ensureSmallArrowTypes(targetType))
}
return retOrPanic(compute.CastArray(a.ctx, vals,
compute.SafeCastOptions(targetType)))
}
targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs, a.useLargeTypes))
if !arrow.TypeEqual(targetType, vals.DataType()) {
switch field.Type.(type) {
case iceberg.TimestampType:
tt, tgtok := targetType.(*arrow.TimestampType)
vt, valok := vals.DataType().(*arrow.TimestampType)
if tgtok && valok && tt.TimeZone == "" && vt.TimeZone == "" && tt.Unit == arrow.Microsecond {
if vt.Unit == arrow.Nanosecond && a.downcastNsTimestamp {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.UnsafeCastOptions(tt)))
} else if vt.Unit == arrow.Second || vt.Unit == arrow.Millisecond {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.SafeCastOptions(tt)))
}
}
panic(fmt.Errorf("unsupported schema projection from %s to %s",
vals.DataType(), targetType))
case iceberg.TimestampTzType:
tt, tgtok := targetType.(*arrow.TimestampType)
vt, valok := vals.DataType().(*arrow.TimestampType)
if tgtok && valok && tt.TimeZone == "UTC" &&
slices.Contains(utcAliases, vt.TimeZone) && tt.Unit == arrow.Microsecond {
if vt.Unit == arrow.Nanosecond && a.downcastNsTimestamp {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.UnsafeCastOptions(tt)))
} else if vt.Unit != arrow.Nanosecond {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.SafeCastOptions(tt)))
}
}
panic(fmt.Errorf("unsupported schema projection from %s to %s",
vals.DataType(), targetType))
default:
return retOrPanic(compute.CastArray(a.ctx, vals,
compute.SafeCastOptions(targetType)))
}
}
vals.Retain()
return vals
}