arrow/array/bufferbuilder.go (180 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"sync/atomic"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
)
type bufBuilder interface {
Retain()
Release()
Len() int
Cap() int
Bytes() []byte
resize(int)
Advance(int)
SetLength(int)
Append([]byte)
Reset()
Finish() *memory.Buffer
}
// A bufferBuilder provides common functionality for populating memory with a sequence of type-specific values.
// Specialized implementations provide type-safe APIs for appending and accessing the memory.
type bufferBuilder struct {
refCount atomic.Int64
mem memory.Allocator
buffer *memory.Buffer
length int
capacity int
bytes []byte
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (b *bufferBuilder) Retain() {
b.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *bufferBuilder) Release() {
debug.Assert(b.refCount.Load() > 0, "too many releases")
if b.refCount.Add(-1) == 0 {
if b.buffer != nil {
b.buffer.Release()
b.buffer, b.bytes = nil, nil
}
}
}
// Len returns the length of the memory buffer in bytes.
func (b *bufferBuilder) Len() int { return b.length }
// Cap returns the total number of bytes that can be stored without allocating additional memory.
func (b *bufferBuilder) Cap() int { return b.capacity }
// Bytes returns a slice of length b.Len().
// The slice is only valid for use until the next buffer modification. That is, until the next call
// to Advance, Reset, Finish or any Append function. The slice aliases the buffer content at least until the next
// buffer modification.
func (b *bufferBuilder) Bytes() []byte { return b.bytes[:b.length] }
func (b *bufferBuilder) resize(elements int) {
if b.buffer == nil {
b.buffer = memory.NewResizableBuffer(b.mem)
}
b.buffer.ResizeNoShrink(elements)
oldCapacity := b.capacity
b.capacity = b.buffer.Cap()
b.bytes = b.buffer.Buf()
if b.capacity > oldCapacity {
memory.Set(b.bytes[oldCapacity:], 0)
}
}
func (b *bufferBuilder) SetLength(length int) {
if length > b.length {
b.Advance(length)
return
}
b.length = length
}
// Advance increases the buffer by length and initializes the skipped bytes to zero.
func (b *bufferBuilder) Advance(length int) {
if b.capacity < b.length+length {
newCapacity := bitutil.NextPowerOf2(b.length + length)
b.resize(newCapacity)
}
b.length += length
}
// Append appends the contents of v to the buffer, resizing it if necessary.
func (b *bufferBuilder) Append(v []byte) {
if b.capacity < b.length+len(v) {
newCapacity := bitutil.NextPowerOf2(b.length + len(v))
b.resize(newCapacity)
}
b.unsafeAppend(v)
}
// Reset returns the buffer to an empty state. Reset releases the memory and sets the length and capacity to zero.
func (b *bufferBuilder) Reset() {
if b.buffer != nil {
b.buffer.Release()
}
b.buffer, b.bytes = nil, nil
b.capacity, b.length = 0, 0
}
// Finish TODO(sgc)
func (b *bufferBuilder) Finish() (buffer *memory.Buffer) {
if b.length > 0 {
b.buffer.ResizeNoShrink(b.length)
}
buffer = b.buffer
b.buffer = nil
b.Reset()
if buffer == nil {
buffer = memory.NewBufferBytes(nil)
}
return
}
func (b *bufferBuilder) unsafeAppend(data []byte) {
copy(b.bytes[b.length:], data)
b.length += len(data)
}
type multiBufferBuilder struct {
refCount atomic.Int64
blockSize int
mem memory.Allocator
blocks []*memory.Buffer
currentOutBuffer int
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (b *multiBufferBuilder) Retain() {
b.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *multiBufferBuilder) Release() {
debug.Assert(b.refCount.Load() > 0, "too many releases")
if b.refCount.Add(-1) == 0 {
b.Reset()
}
}
func (b *multiBufferBuilder) Reserve(nbytes int) {
if len(b.blocks) == 0 {
out := memory.NewResizableBuffer(b.mem)
if nbytes < b.blockSize {
nbytes = b.blockSize
}
out.Reserve(nbytes)
b.currentOutBuffer = 0
b.blocks = []*memory.Buffer{out}
return
}
curBuf := b.blocks[b.currentOutBuffer]
remain := curBuf.Cap() - curBuf.Len()
if nbytes <= remain {
return
}
// search for underfull block that has enough bytes
for i, block := range b.blocks {
remaining := block.Cap() - block.Len()
if nbytes <= remaining {
b.currentOutBuffer = i
return
}
}
// current buffer doesn't have enough space, no underfull buffers
// make new buffer and set that as our current.
newBuf := memory.NewResizableBuffer(b.mem)
if nbytes < b.blockSize {
nbytes = b.blockSize
}
newBuf.Reserve(nbytes)
b.currentOutBuffer = len(b.blocks)
b.blocks = append(b.blocks, newBuf)
}
func (b *multiBufferBuilder) RemainingBytes() int {
if len(b.blocks) == 0 {
return 0
}
buf := b.blocks[b.currentOutBuffer]
return buf.Cap() - buf.Len()
}
func (b *multiBufferBuilder) Reset() {
b.currentOutBuffer = 0
for _, block := range b.Finish() {
block.Release()
}
}
func (b *multiBufferBuilder) UnsafeAppend(hdr *arrow.ViewHeader, val []byte) {
buf := b.blocks[b.currentOutBuffer]
idx, offset := b.currentOutBuffer, buf.Len()
hdr.SetIndexOffset(int32(idx), int32(offset))
n := copy(buf.Buf()[offset:], val)
buf.ResizeNoShrink(offset + n)
}
func (b *multiBufferBuilder) UnsafeAppendString(hdr *arrow.ViewHeader, val string) {
// create a byte slice with zero-copies
// in go1.20 this would be equivalent to unsafe.StringData
v := *(*[]byte)(unsafe.Pointer(&struct {
string
int
}{val, len(val)}))
b.UnsafeAppend(hdr, v)
}
func (b *multiBufferBuilder) Finish() (out []*memory.Buffer) {
b.currentOutBuffer = 0
out, b.blocks = b.blocks, nil
return
}