in arrow/ipc/writer.go [310:554]
func (w *recordEncoder) visit(p *Payload, arr array.Interface) error {
if w.depth <= 0 {
return errMaxRecursion
}
if !w.allow64b && arr.Len() > math.MaxInt32 {
return errBigArray
}
if arr.DataType().ID() == arrow.EXTENSION {
arr := arr.(array.ExtensionArray)
err := w.visit(p, arr.Storage())
if err != nil {
return xerrors.Errorf("failed visiting storage of for array %T: %w", arr, err)
}
return nil
}
// add all common elements
w.fields = append(w.fields, fieldMetadata{
Len: int64(arr.Len()),
Nulls: int64(arr.NullN()),
Offset: 0,
})
if arr.DataType().ID() == arrow.NULL {
return nil
}
switch arr.NullN() {
case 0:
p.body = append(p.body, nil)
default:
data := arr.Data()
bitmap := newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[0])
p.body = append(p.body, bitmap)
}
switch dtype := arr.DataType().(type) {
case *arrow.NullType:
// ok. NullArrays are completely empty.
case *arrow.BooleanType:
var (
data = arr.Data()
bitm *memory.Buffer
)
if data.Len() != 0 {
bitm = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[1])
}
p.body = append(p.body, bitm)
case arrow.FixedWidthDataType:
data := arr.Data()
values := data.Buffers()[1]
arrLen := int64(arr.Len())
typeWidth := int64(dtype.BitWidth() / 8)
minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)
switch {
case needTruncate(int64(data.Offset()), values, minLength):
// non-zero offset: slice the buffer
offset := int64(data.Offset()) * typeWidth
// send padding if available
len := minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len())-offset)
values = memory.NewBufferBytes(values.Bytes()[offset : offset+len])
default:
if values != nil {
values.Retain()
}
}
p.body = append(p.body, values)
case *arrow.BinaryType:
arr := arr.(*array.Binary)
voffsets, err := w.getZeroBasedValueOffsets(arr)
if err != nil {
return xerrors.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err)
}
data := arr.Data()
values := data.Buffers()[2]
var totalDataBytes int64
if voffsets != nil {
totalDataBytes = int64(len(arr.ValueBytes()))
}
switch {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
// slice data buffer to include the range we need now.
var (
beg = int64(arr.ValueOffset(0))
len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes))
)
values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
default:
if values != nil {
values.Retain()
}
}
p.body = append(p.body, voffsets)
p.body = append(p.body, values)
case *arrow.StringType:
arr := arr.(*array.String)
voffsets, err := w.getZeroBasedValueOffsets(arr)
if err != nil {
return xerrors.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err)
}
data := arr.Data()
values := data.Buffers()[2]
var totalDataBytes int64
if voffsets != nil {
totalDataBytes = int64(len(arr.ValueBytes()))
}
switch {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
// slice data buffer to include the range we need now.
var (
beg = int64(arr.ValueOffset(0))
len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes))
)
values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
default:
if values != nil {
values.Retain()
}
}
p.body = append(p.body, voffsets)
p.body = append(p.body, values)
case *arrow.StructType:
w.depth--
arr := arr.(*array.Struct)
for i := 0; i < arr.NumField(); i++ {
err := w.visit(p, arr.Field(i))
if err != nil {
return xerrors.Errorf("could not visit field %d of struct-array: %w", i, err)
}
}
w.depth++
case *arrow.MapType:
arr := arr.(*array.Map)
voffsets, err := w.getZeroBasedValueOffsets(arr)
if err != nil {
return xerrors.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err)
}
p.body = append(p.body, voffsets)
w.depth--
var (
values = arr.ListValues()
mustRelease = false
values_offset int64
values_length int64
)
defer func() {
if mustRelease {
values.Release()
}
}()
if voffsets != nil {
values_offset = int64(arr.Offsets()[0])
values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
}
if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
// must also slice the values
values = array.NewSlice(values, values_offset, values_length)
mustRelease = true
}
err = w.visit(p, values)
if err != nil {
return xerrors.Errorf("could not visit list element for array %T: %w", arr, err)
}
w.depth++
case *arrow.ListType:
arr := arr.(*array.List)
voffsets, err := w.getZeroBasedValueOffsets(arr)
if err != nil {
return xerrors.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err)
}
p.body = append(p.body, voffsets)
w.depth--
var (
values = arr.ListValues()
mustRelease = false
values_offset int64
values_length int64
)
defer func() {
if mustRelease {
values.Release()
}
}()
if voffsets != nil {
values_offset = int64(arr.Offsets()[0])
values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
}
if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
// must also slice the values
values = array.NewSlice(values, values_offset, values_length)
mustRelease = true
}
err = w.visit(p, values)
if err != nil {
return xerrors.Errorf("could not visit list element for array %T: %w", arr, err)
}
w.depth++
case *arrow.FixedSizeListType:
arr := arr.(*array.FixedSizeList)
w.depth--
size := int64(arr.DataType().(*arrow.FixedSizeListType).Len())
beg := int64(arr.Offset()) * size
end := int64(arr.Offset()+arr.Len()) * size
values := array.NewSlice(arr.ListValues(), beg, end)
defer values.Release()
err := w.visit(p, values)
if err != nil {
return xerrors.Errorf("could not visit list element for array %T: %w", arr, err)
}
w.depth++
default:
panic(xerrors.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr, dtype))
}
return nil
}