in arrow/ipc/writer.go [524:852]
func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
if w.depth <= 0 {
return errMaxRecursion
}
if !w.allow64b && arr.Len() > math.MaxInt32 {
return errBigArray
}
if arr.DataType().ID() == arrow.EXTENSION {
arr := arr.(array.ExtensionArray)
err := w.visit(p, arr.Storage())
if err != nil {
return fmt.Errorf("failed visiting storage of for array %T: %w", arr, err)
}
return nil
}
if arr.DataType().ID() == arrow.DICTIONARY {
arr := arr.(*array.Dictionary)
return w.visit(p, arr.Indices())
}
// add all common elements
w.fields = append(w.fields, fieldMetadata{
Len: int64(arr.Len()),
Nulls: int64(arr.NullN()),
Offset: 0,
})
if arr.DataType().ID() == arrow.NULL {
return nil
}
if internal.HasValidityBitmap(arr.DataType().ID(), flatbuf.MetadataVersion(currentMetadataVersion)) {
switch arr.NullN() {
case 0:
// there are no null values, drop the null bitmap
p.body = append(p.body, nil)
default:
data := arr.Data()
var bitmap *memory.Buffer
if data.NullN() == data.Len() {
// every value is null, just use a new zero-initialized bitmap to avoid the expense of copying
bitmap = memory.NewResizableBuffer(w.mem)
minLength := paddedLength(bitutil.BytesForBits(int64(data.Len())), kArrowAlignment)
bitmap.Resize(int(minLength))
} else {
// otherwise truncate and copy the bits
bitmap = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[0])
}
p.body = append(p.body, bitmap)
}
}
switch dtype := arr.DataType().(type) {
case *arrow.NullType:
// ok. NullArrays are completely empty.
case *arrow.BooleanType:
var (
data = arr.Data()
bitm *memory.Buffer
)
if data.Len() != 0 {
bitm = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[1])
}
p.body = append(p.body, bitm)
case arrow.FixedWidthDataType:
data := arr.Data()
values := data.Buffers()[1]
arrLen := int64(arr.Len())
typeWidth := int64(dtype.BitWidth() / 8)
minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)
switch {
case needTruncate(int64(data.Offset()), values, minLength):
// non-zero offset: slice the buffer
offset := int64(data.Offset()) * typeWidth
// send padding if available
len := min(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len())-offset)
values = memory.NewBufferBytes(values.Bytes()[offset : offset+len])
default:
if values != nil {
values.Retain()
}
}
p.body = append(p.body, values)
case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.StringType, *arrow.LargeStringType:
arr := arr.(array.BinaryLike)
voffsets := w.getZeroBasedValueOffsets(arr)
data := arr.Data()
values := data.Buffers()[2]
var totalDataBytes int64
if voffsets != nil {
totalDataBytes = int64(len(arr.ValueBytes()))
}
switch {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
// slice data buffer to include the range we need now.
var (
beg int64 = 0
len = min(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes))
)
if arr.Len() > 0 {
beg = arr.ValueOffset64(0)
}
values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
default:
if values != nil {
values.Retain()
}
}
p.body = append(p.body, voffsets)
p.body = append(p.body, values)
case arrow.BinaryViewDataType:
data := arr.Data()
values := data.Buffers()[1]
arrLen := int64(arr.Len())
typeWidth := int64(arrow.ViewHeaderSizeBytes)
minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)
switch {
case needTruncate(int64(data.Offset()), values, minLength):
// non-zero offset: slice the buffer
offset := data.Offset() * int(typeWidth)
// send padding if available
len := int(min(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len()-offset)))
values = memory.SliceBuffer(values, offset, len)
default:
if values != nil {
values.Retain()
}
}
p.body = append(p.body, values)
w.variadicCounts = append(w.variadicCounts, int64(len(data.Buffers())-2))
for _, b := range data.Buffers()[2:] {
b.Retain()
p.body = append(p.body, b)
}
case *arrow.StructType:
w.depth--
arr := arr.(*array.Struct)
for i := 0; i < arr.NumField(); i++ {
err := w.visit(p, arr.Field(i))
if err != nil {
return fmt.Errorf("could not visit field %d of struct-array: %w", i, err)
}
}
w.depth++
case *arrow.SparseUnionType:
offset, length := arr.Data().Offset(), arr.Len()
arr := arr.(*array.SparseUnion)
typeCodes := getTruncatedBuffer(int64(offset), int64(length), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes())
p.body = append(p.body, typeCodes)
w.depth--
for i := 0; i < arr.NumFields(); i++ {
err := w.visit(p, arr.Field(i))
if err != nil {
return fmt.Errorf("could not visit field %d of sparse union array: %w", i, err)
}
}
w.depth++
case *arrow.DenseUnionType:
offset, length := arr.Data().Offset(), arr.Len()
arr := arr.(*array.DenseUnion)
typeCodes := getTruncatedBuffer(int64(offset), int64(length), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes())
p.body = append(p.body, typeCodes)
w.depth--
dt := arr.UnionType()
// union type codes are not necessarily 0-indexed
maxCode := dt.MaxTypeCode()
// allocate an array of child offsets. Set all to -1 to indicate we
// haven't observed a first occurrence of a particular child yet
offsets := make([]int32, maxCode+1)
lengths := make([]int32, maxCode+1)
offsets[0], lengths[0] = -1, 0
for i := 1; i < len(offsets); i *= 2 {
copy(offsets[i:], offsets[:i])
copy(lengths[i:], lengths[:i])
}
var valueOffsets *memory.Buffer
if offset != 0 {
valueOffsets = w.rebaseDenseUnionValueOffsets(arr, offsets, lengths)
} else {
valueOffsets = getTruncatedBuffer(int64(offset), int64(length), int32(arrow.Int32SizeBytes), arr.ValueOffsets())
}
p.body = append(p.body, valueOffsets)
// visit children and slice accordingly
for i := range dt.Fields() {
child := arr.Field(i)
// for sliced unions it's tricky to know how much to truncate
// the children. For now we'll truncate the children to be
// no longer than the parent union.
if offset != 0 {
code := dt.TypeCodes()[i]
childOffset := offsets[code]
childLen := lengths[code]
if childOffset > 0 {
child = array.NewSlice(child, int64(childOffset), int64(childOffset+childLen))
defer child.Release()
} else if childLen < int32(child.Len()) {
child = array.NewSlice(child, 0, int64(childLen))
defer child.Release()
}
}
if err := w.visit(p, child); err != nil {
return fmt.Errorf("could not visit field %d of dense union array: %w", i, err)
}
}
w.depth++
case *arrow.MapType, *arrow.ListType, *arrow.LargeListType:
arr := arr.(array.ListLike)
voffsets := w.getZeroBasedValueOffsets(arr)
p.body = append(p.body, voffsets)
w.depth--
var (
values = arr.ListValues()
mustRelease = false
values_offset int64
values_end int64
)
defer func() {
if mustRelease {
values.Release()
}
}()
if arr.Len() > 0 && voffsets != nil {
values_offset, _ = arr.ValueOffsets(0)
_, values_end = arr.ValueOffsets(arr.Len() - 1)
}
if arr.Len() != 0 || values_end < int64(values.Len()) {
// must also slice the values
values = array.NewSlice(values, values_offset, values_end)
mustRelease = true
}
err := w.visit(p, values)
if err != nil {
return fmt.Errorf("could not visit list element for array %T: %w", arr, err)
}
w.depth++
case *arrow.ListViewType, *arrow.LargeListViewType:
arr := arr.(array.VarLenListLike)
voffsets, minOffset, maxEnd := w.getZeroBasedListViewOffsets(arr)
vsizes := w.getListViewSizes(arr)
p.body = append(p.body, voffsets)
p.body = append(p.body, vsizes)
w.depth--
var (
values = arr.ListValues()
)
if minOffset != 0 || maxEnd < int64(values.Len()) {
values = array.NewSlice(values, minOffset, maxEnd)
defer values.Release()
}
err := w.visit(p, values)
if err != nil {
return fmt.Errorf("could not visit list element for array %T: %w", arr, err)
}
w.depth++
case *arrow.FixedSizeListType:
arr := arr.(*array.FixedSizeList)
w.depth--
size := int64(arr.DataType().(*arrow.FixedSizeListType).Len())
beg := int64(arr.Offset()) * size
end := int64(arr.Offset()+arr.Len()) * size
values := array.NewSlice(arr.ListValues(), beg, end)
defer values.Release()
err := w.visit(p, values)
if err != nil {
return fmt.Errorf("could not visit list element for array %T: %w", arr, err)
}
w.depth++
case *arrow.RunEndEncodedType:
arr := arr.(*array.RunEndEncoded)
w.depth--
child := arr.LogicalRunEndsArray(w.mem)
defer child.Release()
if err := w.visit(p, child); err != nil {
return err
}
child = arr.LogicalValuesArray()
defer child.Release()
if err := w.visit(p, child); err != nil {
return err
}
w.depth++
default:
panic(fmt.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr, dtype))
}
return nil
}