arrow/array/timestamp.go (292 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package array import ( "bytes" "fmt" "reflect" "strings" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/json" ) // Timestamp represents an immutable sequence of arrow.Timestamp values. type Timestamp struct { array values []arrow.Timestamp } // NewTimestampData creates a new Timestamp from Data. func NewTimestampData(data arrow.ArrayData) *Timestamp { a := &Timestamp{} a.refCount.Add(1) a.setData(data.(*Data)) return a } // Reset resets the array for re-use. func (a *Timestamp) Reset(data *Data) { a.setData(data) } // Value returns the value at the specified index. func (a *Timestamp) Value(i int) arrow.Timestamp { return a.values[i] } func (a *Timestamp) Values() []arrow.Timestamp { return a.values } // TimestampValues returns the values. func (a *Timestamp) TimestampValues() []arrow.Timestamp { return a.Values() } // String returns a string representation of the array. func (a *Timestamp) String() string { o := new(strings.Builder) o.WriteString("[") for i, v := range a.values { if i > 0 { fmt.Fprintf(o, " ") } switch { case a.IsNull(i): o.WriteString(NullValueStr) default: fmt.Fprintf(o, "%v", v) } } o.WriteString("]") return o.String() } func (a *Timestamp) setData(data *Data) { a.array.setData(data) vals := data.buffers[1] if vals != nil { a.values = arrow.TimestampTraits.CastFromBytes(vals.Bytes()) beg := a.array.data.offset end := beg + a.array.data.length a.values = a.values[beg:end] } } func (a *Timestamp) ValueStr(i int) string { if a.IsNull(i) { return NullValueStr } toTime, _ := a.DataType().(*arrow.TimestampType).GetToTimeFunc() return toTime(a.values[i]).Format("2006-01-02 15:04:05.999999999Z0700") } func (a *Timestamp) GetOneForMarshal(i int) interface{} { if val := a.ValueStr(i); val != NullValueStr { return val } return nil } func (a *Timestamp) MarshalJSON() ([]byte, error) { vals := make([]interface{}, a.Len()) for i := range a.values { vals[i] = a.GetOneForMarshal(i) } return json.Marshal(vals) } func arrayEqualTimestamp(left, right *Timestamp) bool { for i := 0; i < left.Len(); i++ { if left.IsNull(i) { continue } if left.Value(i) != right.Value(i) { return false } } return true } type TimestampBuilder struct { builder dtype *arrow.TimestampType data *memory.Buffer rawData []arrow.Timestamp } func NewTimestampBuilder(mem memory.Allocator, dtype *arrow.TimestampType) *TimestampBuilder { tb := &TimestampBuilder{builder: builder{mem: mem}, dtype: dtype} tb.refCount.Add(1) return tb } func (b *TimestampBuilder) Type() arrow.DataType { return b.dtype } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. func (b *TimestampBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) == 0 { if b.nullBitmap != nil { b.nullBitmap.Release() b.nullBitmap = nil } if b.data != nil { b.data.Release() b.data = nil b.rawData = nil } } } func (b *TimestampBuilder) AppendTime(t time.Time) { ts, err := arrow.TimestampFromTime(t, b.dtype.Unit) if err != nil { panic(err) } b.Append(ts) } func (b *TimestampBuilder) Append(v arrow.Timestamp) { b.Reserve(1) b.UnsafeAppend(v) } func (b *TimestampBuilder) AppendNull() { b.Reserve(1) b.UnsafeAppendBoolToBitmap(false) } func (b *TimestampBuilder) AppendNulls(n int) { for i := 0; i < n; i++ { b.AppendNull() } } func (b *TimestampBuilder) AppendEmptyValue() { b.Append(0) } func (b *TimestampBuilder) AppendEmptyValues(n int) { for i := 0; i < n; i++ { b.AppendEmptyValue() } } func (b *TimestampBuilder) UnsafeAppend(v arrow.Timestamp) { bitutil.SetBit(b.nullBitmap.Bytes(), b.length) b.rawData[b.length] = v b.length++ } func (b *TimestampBuilder) UnsafeAppendBoolToBitmap(isValid bool) { if isValid { bitutil.SetBit(b.nullBitmap.Bytes(), b.length) } else { b.nulls++ } b.length++ } // AppendValues will append the values in the v slice. The valid slice determines which values // in v are valid (not null). The valid slice must either be empty or be equal in length to v. If empty, // all values in v are appended and considered valid. func (b *TimestampBuilder) AppendValues(v []arrow.Timestamp, valid []bool) { if len(v) != len(valid) && len(valid) != 0 { panic("len(v) != len(valid) && len(valid) != 0") } if len(v) == 0 { return } b.Reserve(len(v)) arrow.TimestampTraits.Copy(b.rawData[b.length:], v) b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) } func (b *TimestampBuilder) init(capacity int) { b.builder.init(capacity) b.data = memory.NewResizableBuffer(b.mem) bytesN := arrow.TimestampTraits.BytesRequired(capacity) b.data.Resize(bytesN) b.rawData = arrow.TimestampTraits.CastFromBytes(b.data.Bytes()) } // Reserve ensures there is enough space for appending n elements // by checking the capacity and calling Resize if necessary. func (b *TimestampBuilder) Reserve(n int) { b.builder.reserve(n, b.Resize) } // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), // additional memory will be allocated. If n is smaller, the allocated memory may reduced. func (b *TimestampBuilder) Resize(n int) { nBuilder := n if n < minBuilderCapacity { n = minBuilderCapacity } if b.capacity == 0 { b.init(n) } else { b.builder.resize(nBuilder, b.init) b.data.Resize(arrow.TimestampTraits.BytesRequired(n)) b.rawData = arrow.TimestampTraits.CastFromBytes(b.data.Bytes()) } } // NewArray creates a Timestamp array from the memory buffers used by the builder and resets the TimestampBuilder // so it can be used to build a new array. func (b *TimestampBuilder) NewArray() arrow.Array { return b.NewTimestampArray() } // NewTimestampArray creates a Timestamp array from the memory buffers used by the builder and resets the TimestampBuilder // so it can be used to build a new array. func (b *TimestampBuilder) NewTimestampArray() (a *Timestamp) { data := b.newData() a = NewTimestampData(data) data.Release() return } func (b *TimestampBuilder) newData() (data *Data) { bytesRequired := arrow.TimestampTraits.BytesRequired(b.length) if bytesRequired > 0 && bytesRequired < b.data.Len() { // trim buffers b.data.Resize(bytesRequired) } data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, b.data}, nil, b.nulls, 0) b.reset() if b.data != nil { b.data.Release() b.data = nil b.rawData = nil } return } func (b *TimestampBuilder) AppendValueFromString(s string) error { if s == NullValueStr { b.AppendNull() return nil } loc, err := b.dtype.GetZone() if err != nil { return err } v, _, err := arrow.TimestampFromStringInLocation(s, b.dtype.Unit, loc) if err != nil { b.AppendNull() return err } b.Append(v) return nil } func (b *TimestampBuilder) UnmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err } switch v := t.(type) { case nil: b.AppendNull() case string: loc, _ := b.dtype.GetZone() tm, _, err := arrow.TimestampFromStringInLocation(v, b.dtype.Unit, loc) if err != nil { return &json.UnmarshalTypeError{ Value: v, Type: reflect.TypeOf(arrow.Timestamp(0)), Offset: dec.InputOffset(), } } b.Append(tm) case json.Number: n, err := v.Int64() if err != nil { return &json.UnmarshalTypeError{ Value: v.String(), Type: reflect.TypeOf(arrow.Timestamp(0)), Offset: dec.InputOffset(), } } b.Append(arrow.Timestamp(n)) case float64: b.Append(arrow.Timestamp(v)) default: return &json.UnmarshalTypeError{ Value: fmt.Sprint(t), Type: reflect.TypeOf(arrow.Timestamp(0)), Offset: dec.InputOffset(), } } return nil } func (b *TimestampBuilder) Unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.UnmarshalOne(dec); err != nil { return err } } return nil } func (b *TimestampBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("binary builder must unpack from json array, found %s", delim) } return b.Unmarshal(dec) } var ( _ arrow.Array = (*Timestamp)(nil) _ Builder = (*TimestampBuilder)(nil) _ arrow.TypedArray[arrow.Timestamp] = (*Timestamp)(nil) )