func()

in arrow/ipc/writer.go [310:554]


func (w *recordEncoder) visit(p *Payload, arr array.Interface) error {
	if w.depth <= 0 {
		return errMaxRecursion
	}

	if !w.allow64b && arr.Len() > math.MaxInt32 {
		return errBigArray
	}

	if arr.DataType().ID() == arrow.EXTENSION {
		arr := arr.(array.ExtensionArray)
		err := w.visit(p, arr.Storage())
		if err != nil {
			return xerrors.Errorf("failed visiting storage of for array %T: %w", arr, err)
		}
		return nil
	}

	// add all common elements
	w.fields = append(w.fields, fieldMetadata{
		Len:    int64(arr.Len()),
		Nulls:  int64(arr.NullN()),
		Offset: 0,
	})

	if arr.DataType().ID() == arrow.NULL {
		return nil
	}

	switch arr.NullN() {
	case 0:
		p.body = append(p.body, nil)
	default:
		data := arr.Data()
		bitmap := newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[0])
		p.body = append(p.body, bitmap)
	}

	switch dtype := arr.DataType().(type) {
	case *arrow.NullType:
		// ok. NullArrays are completely empty.

	case *arrow.BooleanType:
		var (
			data = arr.Data()
			bitm *memory.Buffer
		)

		if data.Len() != 0 {
			bitm = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[1])
		}
		p.body = append(p.body, bitm)

	case arrow.FixedWidthDataType:
		data := arr.Data()
		values := data.Buffers()[1]
		arrLen := int64(arr.Len())
		typeWidth := int64(dtype.BitWidth() / 8)
		minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)

		switch {
		case needTruncate(int64(data.Offset()), values, minLength):
			// non-zero offset: slice the buffer
			offset := int64(data.Offset()) * typeWidth
			// send padding if available
			len := minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len())-offset)
			values = memory.NewBufferBytes(values.Bytes()[offset : offset+len])
		default:
			if values != nil {
				values.Retain()
			}
		}
		p.body = append(p.body, values)

	case *arrow.BinaryType:
		arr := arr.(*array.Binary)
		voffsets, err := w.getZeroBasedValueOffsets(arr)
		if err != nil {
			return xerrors.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err)
		}
		data := arr.Data()
		values := data.Buffers()[2]

		var totalDataBytes int64
		if voffsets != nil {
			totalDataBytes = int64(len(arr.ValueBytes()))
		}

		switch {
		case needTruncate(int64(data.Offset()), values, totalDataBytes):
			// slice data buffer to include the range we need now.
			var (
				beg = int64(arr.ValueOffset(0))
				len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes))
			)
			values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
		default:
			if values != nil {
				values.Retain()
			}
		}
		p.body = append(p.body, voffsets)
		p.body = append(p.body, values)

	case *arrow.StringType:
		arr := arr.(*array.String)
		voffsets, err := w.getZeroBasedValueOffsets(arr)
		if err != nil {
			return xerrors.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err)
		}
		data := arr.Data()
		values := data.Buffers()[2]

		var totalDataBytes int64
		if voffsets != nil {
			totalDataBytes = int64(len(arr.ValueBytes()))
		}

		switch {
		case needTruncate(int64(data.Offset()), values, totalDataBytes):
			// slice data buffer to include the range we need now.
			var (
				beg = int64(arr.ValueOffset(0))
				len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes))
			)
			values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
		default:
			if values != nil {
				values.Retain()
			}
		}
		p.body = append(p.body, voffsets)
		p.body = append(p.body, values)

	case *arrow.StructType:
		w.depth--
		arr := arr.(*array.Struct)
		for i := 0; i < arr.NumField(); i++ {
			err := w.visit(p, arr.Field(i))
			if err != nil {
				return xerrors.Errorf("could not visit field %d of struct-array: %w", i, err)
			}
		}
		w.depth++

	case *arrow.MapType:
		arr := arr.(*array.Map)
		voffsets, err := w.getZeroBasedValueOffsets(arr)
		if err != nil {
			return xerrors.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err)
		}
		p.body = append(p.body, voffsets)

		w.depth--
		var (
			values        = arr.ListValues()
			mustRelease   = false
			values_offset int64
			values_length int64
		)
		defer func() {
			if mustRelease {
				values.Release()
			}
		}()

		if voffsets != nil {
			values_offset = int64(arr.Offsets()[0])
			values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
		}

		if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
			// must also slice the values
			values = array.NewSlice(values, values_offset, values_length)
			mustRelease = true
		}
		err = w.visit(p, values)

		if err != nil {
			return xerrors.Errorf("could not visit list element for array %T: %w", arr, err)
		}
		w.depth++
	case *arrow.ListType:
		arr := arr.(*array.List)
		voffsets, err := w.getZeroBasedValueOffsets(arr)
		if err != nil {
			return xerrors.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err)
		}
		p.body = append(p.body, voffsets)

		w.depth--
		var (
			values        = arr.ListValues()
			mustRelease   = false
			values_offset int64
			values_length int64
		)
		defer func() {
			if mustRelease {
				values.Release()
			}
		}()

		if voffsets != nil {
			values_offset = int64(arr.Offsets()[0])
			values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
		}

		if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
			// must also slice the values
			values = array.NewSlice(values, values_offset, values_length)
			mustRelease = true
		}
		err = w.visit(p, values)

		if err != nil {
			return xerrors.Errorf("could not visit list element for array %T: %w", arr, err)
		}
		w.depth++

	case *arrow.FixedSizeListType:
		arr := arr.(*array.FixedSizeList)

		w.depth--

		size := int64(arr.DataType().(*arrow.FixedSizeListType).Len())
		beg := int64(arr.Offset()) * size
		end := int64(arr.Offset()+arr.Len()) * size

		values := array.NewSlice(arr.ListValues(), beg, end)
		defer values.Release()

		err := w.visit(p, values)

		if err != nil {
			return xerrors.Errorf("could not visit list element for array %T: %w", arr, err)
		}
		w.depth++

	default:
		panic(xerrors.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr, dtype))
	}

	return nil
}