arrow/array/encoded.go (355 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package array import ( "bytes" "fmt" "math" "reflect" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/encoded" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/json" "github.com/apache/arrow-go/v18/internal/utils" ) // RunEndEncoded represents an array containing two children: // an array of int32 values defining the ends of each run of values // and an array of values type RunEndEncoded struct { array ends arrow.Array values arrow.Array } func NewRunEndEncodedArray(runEnds, values arrow.Array, logicalLength, offset int) *RunEndEncoded { data := NewData(arrow.RunEndEncodedOf(runEnds.DataType(), values.DataType()), logicalLength, []*memory.Buffer{nil}, []arrow.ArrayData{runEnds.Data(), values.Data()}, 0, offset) defer data.Release() return NewRunEndEncodedData(data) } func NewRunEndEncodedData(data arrow.ArrayData) *RunEndEncoded { r := &RunEndEncoded{} r.refCount.Add(1) r.setData(data.(*Data)) return r } func (r *RunEndEncoded) Values() arrow.Array { return r.values } func (r *RunEndEncoded) RunEndsArr() arrow.Array { return r.ends } func (r *RunEndEncoded) Retain() { r.array.Retain() r.values.Retain() r.ends.Retain() } func (r *RunEndEncoded) Release() { r.array.Release() r.values.Release() r.ends.Release() } // LogicalValuesArray returns an array holding the values of each // run, only over the range of run values inside the logical offset/length // range of the parent array. // // # Example // // For this array: // // RunEndEncoded: { Offset: 150, Length: 1500 } // RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ] // Values: [ "a", "b", "c", "d", "e", "f", "g", "h" ] // // LogicalValuesArray will return the following array: // // [ "f", "g" ] // // This is because the offset of 150 tells it to skip the values until // "f" which corresponds with the logical offset (the run from 10 - 1000), // and stops after "g" because the length + offset goes to 1650 which is // within the run from 1000 - 1750, corresponding to the "g" value. // // # Note // // The return from this needs to be Released. func (r *RunEndEncoded) LogicalValuesArray() arrow.Array { physOffset := r.GetPhysicalOffset() physLength := r.GetPhysicalLength() data := NewSliceData(r.data.Children()[1], int64(physOffset), int64(physOffset+physLength)) defer data.Release() return MakeFromData(data) } // LogicalRunEndsArray returns an array holding the logical indexes // of each run end, only over the range of run end values relative // to the logical offset/length range of the parent array. // // For arrays with an offset, this is not a slice of the existing // internal run ends array. Instead a new array is created with run-ends // that are adjusted so the new array can have an offset of 0. As a result // this method can be expensive to call for an array with a non-zero offset. // // # Example // // For this array: // // RunEndEncoded: { Offset: 150, Length: 1500 } // RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ] // Values: [ "a", "b", "c", "d", "e", "f", "g", "h" ] // // LogicalRunEndsArray will return the following array: // // [ 850, 1500 ] // // This is because the offset of 150 tells us to skip all run-ends less // than 150 (by finding the physical offset), and we adjust the run-ends // accordingly (1000 - 150 = 850). The logical length of the array is 1500, // so we know we don't want to go past the 1750 run end. Thus the last // run-end is determined by doing: min(1750 - 150, 1500) = 1500. // // # Note // // The return from this needs to be Released func (r *RunEndEncoded) LogicalRunEndsArray(mem memory.Allocator) arrow.Array { physOffset := r.GetPhysicalOffset() physLength := r.GetPhysicalLength() if r.data.offset == 0 { data := NewSliceData(r.data.childData[0], 0, int64(physLength)) defer data.Release() return MakeFromData(data) } bldr := NewBuilder(mem, r.data.childData[0].DataType()) defer bldr.Release() bldr.Resize(physLength) switch e := r.ends.(type) { case *Int16: for _, v := range e.Int16Values()[physOffset : physOffset+physLength] { v -= int16(r.data.offset) v = int16(utils.Min(int(v), r.data.length)) bldr.(*Int16Builder).Append(v) } case *Int32: for _, v := range e.Int32Values()[physOffset : physOffset+physLength] { v -= int32(r.data.offset) v = int32(utils.Min(int(v), r.data.length)) bldr.(*Int32Builder).Append(v) } case *Int64: for _, v := range e.Int64Values()[physOffset : physOffset+physLength] { v -= int64(r.data.offset) v = int64(utils.Min(int(v), r.data.length)) bldr.(*Int64Builder).Append(v) } } return bldr.NewArray() } func (r *RunEndEncoded) setData(data *Data) { if len(data.childData) != 2 { panic(fmt.Errorf("%w: arrow/array: RLE array must have exactly 2 children", arrow.ErrInvalid)) } debug.Assert(data.dtype.ID() == arrow.RUN_END_ENCODED, "invalid type for RunLengthEncoded") if !data.dtype.(*arrow.RunEndEncodedType).ValidRunEndsType(data.childData[0].DataType()) { panic(fmt.Errorf("%w: arrow/array: run ends array must be int16, int32, or int64", arrow.ErrInvalid)) } if data.childData[0].NullN() > 0 { panic(fmt.Errorf("%w: arrow/array: run ends array cannot contain nulls", arrow.ErrInvalid)) } r.array.setData(data) r.ends = MakeFromData(r.data.childData[0]) r.values = MakeFromData(r.data.childData[1]) } func (r *RunEndEncoded) GetPhysicalOffset() int { return encoded.FindPhysicalOffset(r.data) } func (r *RunEndEncoded) GetPhysicalLength() int { return encoded.GetPhysicalLength(r.data) } // GetPhysicalIndex can be used to get the run-encoded value instead of costly LogicalValuesArray // in the following way: // // r.Values().(valuetype).Value(r.GetPhysicalIndex(i)) func (r *RunEndEncoded) GetPhysicalIndex(i int) int { return encoded.FindPhysicalIndex(r.data, i+r.data.offset) } // ValueStr will return the str representation of the value at the logical offset i. func (r *RunEndEncoded) ValueStr(i int) string { return r.values.ValueStr(r.GetPhysicalIndex(i)) } func (r *RunEndEncoded) String() string { var buf bytes.Buffer buf.WriteByte('[') for i := 0; i < r.ends.Len(); i++ { if i != 0 { buf.WriteByte(',') } value := r.values.GetOneForMarshal(i) if byts, ok := value.(json.RawMessage); ok { value = string(byts) } fmt.Fprintf(&buf, "{%d -> %v}", r.ends.GetOneForMarshal(i), value) } buf.WriteByte(']') return buf.String() } func (r *RunEndEncoded) GetOneForMarshal(i int) interface{} { return r.values.GetOneForMarshal(r.GetPhysicalIndex(i)) } func (r *RunEndEncoded) MarshalJSON() ([]byte, error) { var buf bytes.Buffer enc := json.NewEncoder(&buf) buf.WriteByte('[') for i := 0; i < r.Len(); i++ { if i != 0 { buf.WriteByte(',') } if err := enc.Encode(r.GetOneForMarshal(i)); err != nil { return nil, err } } buf.WriteByte(']') return buf.Bytes(), nil } func arrayRunEndEncodedEqual(l, r *RunEndEncoded) bool { // types were already checked before getting here, so we know // the encoded types are equal mr := encoded.NewMergedRuns([2]arrow.Array{l, r}) for mr.Next() { lIndex := mr.IndexIntoArray(0) rIndex := mr.IndexIntoArray(1) if !SliceEqual(l.values, lIndex, lIndex+1, r.values, rIndex, rIndex+1) { return false } } return true } func arrayRunEndEncodedApproxEqual(l, r *RunEndEncoded, opt equalOption) bool { // types were already checked before getting here, so we know // the encoded types are equal mr := encoded.NewMergedRuns([2]arrow.Array{l, r}) for mr.Next() { lIndex := mr.IndexIntoArray(0) rIndex := mr.IndexIntoArray(1) if !sliceApproxEqual(l.values, lIndex, lIndex+1, r.values, rIndex, rIndex+1, opt) { return false } } return true } type RunEndEncodedBuilder struct { builder dt arrow.DataType runEnds Builder values Builder maxRunEnd uint64 // currently, mixing AppendValueFromString & UnmarshalOne is unsupported lastUnmarshalled interface{} unmarshalled bool // tracks if Unmarshal was called (in case lastUnmarshalled is nil) lastStr *string } func NewRunEndEncodedBuilder(mem memory.Allocator, runEnds, encoded arrow.DataType) *RunEndEncodedBuilder { dt := arrow.RunEndEncodedOf(runEnds, encoded) if !dt.ValidRunEndsType(runEnds) { panic("arrow/ree: invalid runEnds type for run length encoded array") } var maxEnd uint64 switch runEnds.ID() { case arrow.INT16: maxEnd = math.MaxInt16 case arrow.INT32: maxEnd = math.MaxInt32 case arrow.INT64: maxEnd = math.MaxInt64 } reb := &RunEndEncodedBuilder{ builder: builder{mem: mem}, dt: dt, runEnds: NewBuilder(mem, runEnds), values: NewBuilder(mem, encoded), maxRunEnd: maxEnd, lastUnmarshalled: nil, } reb.builder.refCount.Add(1) return reb } func (b *RunEndEncodedBuilder) Type() arrow.DataType { return b.dt } func (b *RunEndEncodedBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) == 0 { b.values.Release() b.runEnds.Release() } } func (b *RunEndEncodedBuilder) addLength(n uint64) { if uint64(b.length)+n > b.maxRunEnd { panic(fmt.Errorf("%w: %s array length must fit be less than %d", arrow.ErrInvalid, b.dt, b.maxRunEnd)) } b.length += int(n) } func (b *RunEndEncodedBuilder) finishRun() { b.lastUnmarshalled = nil b.lastStr = nil b.unmarshalled = false if b.length == 0 { return } switch bldr := b.runEnds.(type) { case *Int16Builder: bldr.Append(int16(b.length)) case *Int32Builder: bldr.Append(int32(b.length)) case *Int64Builder: bldr.Append(int64(b.length)) } } func (b *RunEndEncodedBuilder) ValueBuilder() Builder { return b.values } func (b *RunEndEncodedBuilder) Append(n uint64) { b.finishRun() b.addLength(n) } func (b *RunEndEncodedBuilder) AppendRuns(runs []uint64) { for _, r := range runs { b.finishRun() b.addLength(r) } } func (b *RunEndEncodedBuilder) ContinueRun(n uint64) { b.addLength(n) } func (b *RunEndEncodedBuilder) AppendNull() { b.finishRun() b.values.AppendNull() b.addLength(1) } func (b *RunEndEncodedBuilder) AppendNulls(n int) { for i := 0; i < n; i++ { b.AppendNull() } } func (b *RunEndEncodedBuilder) NullN() int { return UnknownNullCount } func (b *RunEndEncodedBuilder) AppendEmptyValue() { b.AppendNull() } func (b *RunEndEncodedBuilder) AppendEmptyValues(n int) { b.AppendNulls(n) } func (b *RunEndEncodedBuilder) Reserve(n int) { b.values.Reserve(n) b.runEnds.Reserve(n) } func (b *RunEndEncodedBuilder) Resize(n int) { b.values.Resize(n) b.runEnds.Resize(n) } func (b *RunEndEncodedBuilder) NewRunEndEncodedArray() *RunEndEncoded { data := b.newData() defer data.Release() return NewRunEndEncodedData(data) } func (b *RunEndEncodedBuilder) NewArray() arrow.Array { return b.NewRunEndEncodedArray() } func (b *RunEndEncodedBuilder) newData() (data *Data) { b.finishRun() values := b.values.NewArray() defer values.Release() runEnds := b.runEnds.NewArray() defer runEnds.Release() data = NewData( b.dt, b.length, []*memory.Buffer{}, []arrow.ArrayData{runEnds.Data(), values.Data()}, 0, 0) b.reset() return } // AppendValueFromString can't be used in conjunction with UnmarshalOne func (b *RunEndEncodedBuilder) AppendValueFromString(s string) error { // we don't support mixing AppendValueFromString & UnmarshalOne if b.unmarshalled { return fmt.Errorf("%w: mixing AppendValueFromString & UnmarshalOne not yet implemented", arrow.ErrNotImplemented) } if s == NullValueStr { b.AppendNull() return nil } if b.lastStr != nil && s == *b.lastStr { b.ContinueRun(1) return nil } b.Append(1) lastStr := s b.lastStr = &lastStr return b.ValueBuilder().AppendValueFromString(s) } // UnmarshalOne can't be used in conjunction with AppendValueFromString func (b *RunEndEncodedBuilder) UnmarshalOne(dec *json.Decoder) error { // we don't support mixing AppendValueFromString & UnmarshalOne if b.lastStr != nil { return fmt.Errorf("%w: mixing AppendValueFromString & UnmarshalOne not yet implemented", arrow.ErrNotImplemented) } var value interface{} if err := dec.Decode(&value); err != nil { return err } // if we unmarshalled the same value as the previous one, we want to // continue the run. However, there's an edge case. At the start of // unmarshalling, lastUnmarshalled will be nil, but we might get // nil as the first value we unmarshal. In that case we want to // make sure we add a new run instead. We can detect that case by // checking that the number of runEnds matches the number of values // we have, which means no matter what we have to start a new run if reflect.DeepEqual(value, b.lastUnmarshalled) && (value != nil || b.runEnds.Len() != b.values.Len()) { b.ContinueRun(1) return nil } data, err := json.Marshal(value) if err != nil { return err } b.Append(1) b.lastUnmarshalled = value b.unmarshalled = true return b.ValueBuilder().UnmarshalOne(json.NewDecoder(bytes.NewReader(data))) } // Unmarshal can't be used in conjunction with AppendValueFromString (as it calls UnmarshalOne) func (b *RunEndEncodedBuilder) Unmarshal(dec *json.Decoder) error { b.finishRun() for dec.More() { if err := b.UnmarshalOne(dec); err != nil { return err } } return nil } // UnmarshalJSON can't be used in conjunction with AppendValueFromString (as it calls UnmarshalOne) func (b *RunEndEncodedBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("list builder must unpack from json array, found %s", delim) } return b.Unmarshal(dec) } var ( _ arrow.Array = (*RunEndEncoded)(nil) _ Builder = (*RunEndEncodedBuilder)(nil) )