arrow/array/decimal.go (355 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"bytes"
"fmt"
"reflect"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/decimal"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/json"
)
type baseDecimal[T interface {
decimal.DecimalTypes
decimal.Num[T]
}] struct {
array
values []T
}
func newDecimalData[T interface {
decimal.DecimalTypes
decimal.Num[T]
}](data arrow.ArrayData) *baseDecimal[T] {
a := &baseDecimal[T]{}
a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
func (a *baseDecimal[T]) Value(i int) T { return a.values[i] }
func (a *baseDecimal[T]) ValueStr(i int) string {
if a.IsNull(i) {
return NullValueStr
}
return a.GetOneForMarshal(i).(string)
}
func (a *baseDecimal[T]) Values() []T { return a.values }
func (a *baseDecimal[T]) String() string {
o := new(strings.Builder)
o.WriteString("[")
for i := 0; i < a.Len(); i++ {
if i > 0 {
fmt.Fprintf(o, " ")
}
switch {
case a.IsNull(i):
o.WriteString(NullValueStr)
default:
fmt.Fprintf(o, "%v", a.Value(i))
}
}
o.WriteString("]")
return o.String()
}
func (a *baseDecimal[T]) setData(data *Data) {
a.array.setData(data)
vals := data.buffers[1]
if vals != nil {
a.values = arrow.GetData[T](vals.Bytes())
beg := a.array.data.offset
end := beg + a.array.data.length
a.values = a.values[beg:end]
}
}
func (a *baseDecimal[T]) GetOneForMarshal(i int) any {
if a.IsNull(i) {
return nil
}
typ := a.DataType().(arrow.DecimalType)
n, scale := a.Value(i), typ.GetScale()
return n.ToBigFloat(scale).Text('g', int(typ.GetPrecision()))
}
func (a *baseDecimal[T]) MarshalJSON() ([]byte, error) {
vals := make([]any, a.Len())
for i := 0; i < a.Len(); i++ {
vals[i] = a.GetOneForMarshal(i)
}
return json.Marshal(vals)
}
func arrayEqualDecimal[T interface {
decimal.DecimalTypes
decimal.Num[T]
}](left, right *baseDecimal[T]) bool {
for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
if left.Value(i) != right.Value(i) {
return false
}
}
return true
}
type Decimal32 = baseDecimal[decimal.Decimal32]
func NewDecimal32Data(data arrow.ArrayData) *Decimal32 {
return newDecimalData[decimal.Decimal32](data)
}
type Decimal64 = baseDecimal[decimal.Decimal64]
func NewDecimal64Data(data arrow.ArrayData) *Decimal64 {
return newDecimalData[decimal.Decimal64](data)
}
type Decimal128 = baseDecimal[decimal.Decimal128]
func NewDecimal128Data(data arrow.ArrayData) *Decimal128 {
return newDecimalData[decimal.Decimal128](data)
}
type Decimal256 = baseDecimal[decimal.Decimal256]
func NewDecimal256Data(data arrow.ArrayData) *Decimal256 {
return newDecimalData[decimal.Decimal256](data)
}
type (
Decimal32Builder = baseDecimalBuilder[decimal.Decimal32]
Decimal64Builder = baseDecimalBuilder[decimal.Decimal64]
Decimal128Builder struct {
*baseDecimalBuilder[decimal.Decimal128]
}
)
func (b *Decimal128Builder) NewDecimal128Array() *Decimal128 {
return b.NewDecimalArray()
}
type Decimal256Builder struct {
*baseDecimalBuilder[decimal.Decimal256]
}
func (b *Decimal256Builder) NewDecimal256Array() *Decimal256 {
return b.NewDecimalArray()
}
type baseDecimalBuilder[T interface {
decimal.DecimalTypes
decimal.Num[T]
}] struct {
builder
traits decimal.Traits[T]
dtype arrow.DecimalType
data *memory.Buffer
rawData []T
}
func newDecimalBuilder[T interface {
decimal.DecimalTypes
decimal.Num[T]
}, DT arrow.DecimalType](mem memory.Allocator, dtype DT) *baseDecimalBuilder[T] {
bdb := &baseDecimalBuilder[T]{
builder: builder{mem: mem},
dtype: dtype,
}
bdb.builder.refCount.Add(1)
return bdb
}
func (b *baseDecimalBuilder[T]) Type() arrow.DataType { return b.dtype }
func (b *baseDecimalBuilder[T]) Release() {
debug.Assert(b.refCount.Load() > 0, "too many releases")
if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
}
if b.data != nil {
b.data.Release()
b.data, b.rawData = nil, nil
}
}
}
func (b *baseDecimalBuilder[T]) Append(v T) {
b.Reserve(1)
b.UnsafeAppend(v)
}
func (b *baseDecimalBuilder[T]) UnsafeAppend(v T) {
bitutil.SetBit(b.nullBitmap.Bytes(), b.length)
b.rawData[b.length] = v
b.length++
}
func (b *baseDecimalBuilder[T]) AppendNull() {
b.Reserve(1)
b.UnsafeAppendBoolToBitmap(false)
}
func (b *baseDecimalBuilder[T]) AppendNulls(n int) {
for i := 0; i < n; i++ {
b.AppendNull()
}
}
func (b *baseDecimalBuilder[T]) AppendEmptyValue() {
var empty T
b.Append(empty)
}
func (b *baseDecimalBuilder[T]) AppendEmptyValues(n int) {
for i := 0; i < n; i++ {
b.AppendEmptyValue()
}
}
func (b *baseDecimalBuilder[T]) UnsafeAppendBoolToBitmap(isValid bool) {
if isValid {
bitutil.SetBit(b.nullBitmap.Bytes(), b.length)
} else {
b.nulls++
}
b.length++
}
func (b *baseDecimalBuilder[T]) AppendValues(v []T, valid []bool) {
if len(v) != len(valid) && len(valid) != 0 {
panic("len(v) != len(valid) && len(valid) != 0")
}
if len(v) == 0 {
return
}
b.Reserve(len(v))
if len(v) > 0 {
copy(b.rawData[b.length:], v)
}
b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
}
func (b *baseDecimalBuilder[T]) init(capacity int) {
b.builder.init(capacity)
b.data = memory.NewResizableBuffer(b.mem)
bytesN := int(reflect.TypeFor[T]().Size()) * capacity
b.data.Resize(bytesN)
b.rawData = arrow.GetData[T](b.data.Bytes())
}
func (b *baseDecimalBuilder[T]) Reserve(n int) {
b.builder.reserve(n, b.Resize)
}
func (b *baseDecimalBuilder[T]) Resize(n int) {
nBuilder := n
if n < minBuilderCapacity {
n = minBuilderCapacity
}
if b.capacity == 0 {
b.init(n)
} else {
b.builder.resize(nBuilder, b.init)
b.data.Resize(b.traits.BytesRequired(n))
b.rawData = arrow.GetData[T](b.data.Bytes())
}
}
func (b *baseDecimalBuilder[T]) NewDecimalArray() (a *baseDecimal[T]) {
data := b.newData()
a = newDecimalData[T](data)
data.Release()
return
}
func (b *baseDecimalBuilder[T]) NewArray() arrow.Array {
return b.NewDecimalArray()
}
func (b *baseDecimalBuilder[T]) newData() (data *Data) {
bytesRequired := b.traits.BytesRequired(b.length)
if bytesRequired > 0 && bytesRequired < b.data.Len() {
// trim buffers
b.data.Resize(bytesRequired)
}
data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, b.data}, nil, b.nulls, 0)
b.reset()
if b.data != nil {
b.data.Release()
b.data, b.rawData = nil, nil
}
return
}
func (b *baseDecimalBuilder[T]) AppendValueFromString(s string) error {
if s == NullValueStr {
b.AppendNull()
return nil
}
val, err := b.traits.FromString(s, b.dtype.GetPrecision(), b.dtype.GetScale())
if err != nil {
b.AppendNull()
return err
}
b.Append(val)
return nil
}
func (b *baseDecimalBuilder[T]) UnmarshalOne(dec *json.Decoder) error {
t, err := dec.Token()
if err != nil {
return err
}
var token T
switch v := t.(type) {
case float64:
token, err = b.traits.FromFloat64(v, b.dtype.GetPrecision(), b.dtype.GetScale())
if err != nil {
return err
}
b.Append(token)
case string:
token, err = b.traits.FromString(v, b.dtype.GetPrecision(), b.dtype.GetScale())
if err != nil {
return err
}
b.Append(token)
case json.Number:
token, err = b.traits.FromString(v.String(), b.dtype.GetPrecision(), b.dtype.GetScale())
if err != nil {
return err
}
b.Append(token)
case nil:
b.AppendNull()
default:
return &json.UnmarshalTypeError{
Value: fmt.Sprint(t),
Type: reflect.TypeFor[T](),
Offset: dec.InputOffset(),
}
}
return nil
}
func (b *baseDecimalBuilder[T]) Unmarshal(dec *json.Decoder) error {
for dec.More() {
if err := b.UnmarshalOne(dec); err != nil {
return err
}
}
return nil
}
func (b *baseDecimalBuilder[T]) UnmarshalJSON(data []byte) error {
dec := json.NewDecoder(bytes.NewReader(data))
t, err := dec.Token()
if err != nil {
return err
}
if delim, ok := t.(json.Delim); !ok || delim != '[' {
return fmt.Errorf("decimal builder must unpack from json array, found %s", delim)
}
return b.Unmarshal(dec)
}
func NewDecimal32Builder(mem memory.Allocator, dtype *arrow.Decimal32Type) *Decimal32Builder {
b := newDecimalBuilder[decimal.Decimal32](mem, dtype)
b.traits = decimal.Dec32Traits
return b
}
func NewDecimal64Builder(mem memory.Allocator, dtype *arrow.Decimal64Type) *Decimal64Builder {
b := newDecimalBuilder[decimal.Decimal64](mem, dtype)
b.traits = decimal.Dec64Traits
return b
}
func NewDecimal128Builder(mem memory.Allocator, dtype *arrow.Decimal128Type) *Decimal128Builder {
b := newDecimalBuilder[decimal.Decimal128](mem, dtype)
b.traits = decimal.Dec128Traits
return &Decimal128Builder{b}
}
func NewDecimal256Builder(mem memory.Allocator, dtype *arrow.Decimal256Type) *Decimal256Builder {
b := newDecimalBuilder[decimal.Decimal256](mem, dtype)
b.traits = decimal.Dec256Traits
return &Decimal256Builder{b}
}
var (
_ arrow.Array = (*Decimal32)(nil)
_ arrow.Array = (*Decimal64)(nil)
_ arrow.Array = (*Decimal128)(nil)
_ arrow.Array = (*Decimal256)(nil)
_ Builder = (*Decimal32Builder)(nil)
_ Builder = (*Decimal64Builder)(nil)
_ Builder = (*Decimal128Builder)(nil)
_ Builder = (*Decimal256Builder)(nil)
_ arrow.TypedArray[decimal.Decimal32] = (*Decimal32)(nil)
_ arrow.TypedArray[decimal.Decimal64] = (*Decimal64)(nil)
_ arrow.TypedArray[decimal.Decimal128] = (*Decimal128)(nil)
_ arrow.TypedArray[decimal.Decimal256] = (*Decimal256)(nil)
)