func()

in arrow/ipc/writer.go [524:852]


func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
	if w.depth <= 0 {
		return errMaxRecursion
	}

	if !w.allow64b && arr.Len() > math.MaxInt32 {
		return errBigArray
	}

	if arr.DataType().ID() == arrow.EXTENSION {
		arr := arr.(array.ExtensionArray)
		err := w.visit(p, arr.Storage())
		if err != nil {
			return fmt.Errorf("failed visiting storage of for array %T: %w", arr, err)
		}
		return nil
	}

	if arr.DataType().ID() == arrow.DICTIONARY {
		arr := arr.(*array.Dictionary)
		return w.visit(p, arr.Indices())
	}

	// add all common elements
	w.fields = append(w.fields, fieldMetadata{
		Len:    int64(arr.Len()),
		Nulls:  int64(arr.NullN()),
		Offset: 0,
	})

	if arr.DataType().ID() == arrow.NULL {
		return nil
	}

	if internal.HasValidityBitmap(arr.DataType().ID(), flatbuf.MetadataVersion(currentMetadataVersion)) {
		switch arr.NullN() {
		case 0:
			// there are no null values, drop the null bitmap
			p.body = append(p.body, nil)
		default:
			data := arr.Data()
			var bitmap *memory.Buffer
			if data.NullN() == data.Len() {
				// every value is null, just use a new zero-initialized bitmap to avoid the expense of copying
				bitmap = memory.NewResizableBuffer(w.mem)
				minLength := paddedLength(bitutil.BytesForBits(int64(data.Len())), kArrowAlignment)
				bitmap.Resize(int(minLength))
			} else {
				// otherwise truncate and copy the bits
				bitmap = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[0])
			}
			p.body = append(p.body, bitmap)
		}
	}

	switch dtype := arr.DataType().(type) {
	case *arrow.NullType:
		// ok. NullArrays are completely empty.

	case *arrow.BooleanType:
		var (
			data = arr.Data()
			bitm *memory.Buffer
		)

		if data.Len() != 0 {
			bitm = newTruncatedBitmap(w.mem, int64(data.Offset()), int64(data.Len()), data.Buffers()[1])
		}
		p.body = append(p.body, bitm)

	case arrow.FixedWidthDataType:
		data := arr.Data()
		values := data.Buffers()[1]
		arrLen := int64(arr.Len())
		typeWidth := int64(dtype.BitWidth() / 8)
		minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)

		switch {
		case needTruncate(int64(data.Offset()), values, minLength):
			// non-zero offset: slice the buffer
			offset := int64(data.Offset()) * typeWidth
			// send padding if available
			len := min(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len())-offset)
			values = memory.NewBufferBytes(values.Bytes()[offset : offset+len])
		default:
			if values != nil {
				values.Retain()
			}
		}
		p.body = append(p.body, values)

	case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.StringType, *arrow.LargeStringType:
		arr := arr.(array.BinaryLike)
		voffsets := w.getZeroBasedValueOffsets(arr)
		data := arr.Data()
		values := data.Buffers()[2]

		var totalDataBytes int64
		if voffsets != nil {
			totalDataBytes = int64(len(arr.ValueBytes()))
		}

		switch {
		case needTruncate(int64(data.Offset()), values, totalDataBytes):
			// slice data buffer to include the range we need now.
			var (
				beg int64 = 0
				len       = min(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes))
			)
			if arr.Len() > 0 {
				beg = arr.ValueOffset64(0)
			}

			values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
		default:
			if values != nil {
				values.Retain()
			}
		}
		p.body = append(p.body, voffsets)
		p.body = append(p.body, values)

	case arrow.BinaryViewDataType:
		data := arr.Data()
		values := data.Buffers()[1]
		arrLen := int64(arr.Len())
		typeWidth := int64(arrow.ViewHeaderSizeBytes)
		minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)

		switch {
		case needTruncate(int64(data.Offset()), values, minLength):
			// non-zero offset: slice the buffer
			offset := data.Offset() * int(typeWidth)
			// send padding if available
			len := int(min(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len()-offset)))
			values = memory.SliceBuffer(values, offset, len)
		default:
			if values != nil {
				values.Retain()
			}
		}
		p.body = append(p.body, values)

		w.variadicCounts = append(w.variadicCounts, int64(len(data.Buffers())-2))
		for _, b := range data.Buffers()[2:] {
			b.Retain()
			p.body = append(p.body, b)
		}

	case *arrow.StructType:
		w.depth--
		arr := arr.(*array.Struct)
		for i := 0; i < arr.NumField(); i++ {
			err := w.visit(p, arr.Field(i))
			if err != nil {
				return fmt.Errorf("could not visit field %d of struct-array: %w", i, err)
			}
		}
		w.depth++

	case *arrow.SparseUnionType:
		offset, length := arr.Data().Offset(), arr.Len()
		arr := arr.(*array.SparseUnion)
		typeCodes := getTruncatedBuffer(int64(offset), int64(length), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes())
		p.body = append(p.body, typeCodes)

		w.depth--
		for i := 0; i < arr.NumFields(); i++ {
			err := w.visit(p, arr.Field(i))
			if err != nil {
				return fmt.Errorf("could not visit field %d of sparse union array: %w", i, err)
			}
		}
		w.depth++
	case *arrow.DenseUnionType:
		offset, length := arr.Data().Offset(), arr.Len()
		arr := arr.(*array.DenseUnion)
		typeCodes := getTruncatedBuffer(int64(offset), int64(length), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes())
		p.body = append(p.body, typeCodes)

		w.depth--
		dt := arr.UnionType()

		// union type codes are not necessarily 0-indexed
		maxCode := dt.MaxTypeCode()

		// allocate an array of child offsets. Set all to -1 to indicate we
		// haven't observed a first occurrence of a particular child yet
		offsets := make([]int32, maxCode+1)
		lengths := make([]int32, maxCode+1)
		offsets[0], lengths[0] = -1, 0
		for i := 1; i < len(offsets); i *= 2 {
			copy(offsets[i:], offsets[:i])
			copy(lengths[i:], lengths[:i])
		}

		var valueOffsets *memory.Buffer
		if offset != 0 {
			valueOffsets = w.rebaseDenseUnionValueOffsets(arr, offsets, lengths)
		} else {
			valueOffsets = getTruncatedBuffer(int64(offset), int64(length), int32(arrow.Int32SizeBytes), arr.ValueOffsets())
		}
		p.body = append(p.body, valueOffsets)

		// visit children and slice accordingly
		for i := range dt.Fields() {
			child := arr.Field(i)
			// for sliced unions it's tricky to know how much to truncate
			// the children. For now we'll truncate the children to be
			// no longer than the parent union.

			if offset != 0 {
				code := dt.TypeCodes()[i]
				childOffset := offsets[code]
				childLen := lengths[code]

				if childOffset > 0 {
					child = array.NewSlice(child, int64(childOffset), int64(childOffset+childLen))
					defer child.Release()
				} else if childLen < int32(child.Len()) {
					child = array.NewSlice(child, 0, int64(childLen))
					defer child.Release()
				}
			}
			if err := w.visit(p, child); err != nil {
				return fmt.Errorf("could not visit field %d of dense union array: %w", i, err)
			}
		}
		w.depth++
	case *arrow.MapType, *arrow.ListType, *arrow.LargeListType:
		arr := arr.(array.ListLike)
		voffsets := w.getZeroBasedValueOffsets(arr)
		p.body = append(p.body, voffsets)

		w.depth--
		var (
			values        = arr.ListValues()
			mustRelease   = false
			values_offset int64
			values_end    int64
		)
		defer func() {
			if mustRelease {
				values.Release()
			}
		}()

		if arr.Len() > 0 && voffsets != nil {
			values_offset, _ = arr.ValueOffsets(0)
			_, values_end = arr.ValueOffsets(arr.Len() - 1)
		}

		if arr.Len() != 0 || values_end < int64(values.Len()) {
			// must also slice the values
			values = array.NewSlice(values, values_offset, values_end)
			mustRelease = true
		}
		err := w.visit(p, values)

		if err != nil {
			return fmt.Errorf("could not visit list element for array %T: %w", arr, err)
		}
		w.depth++

	case *arrow.ListViewType, *arrow.LargeListViewType:
		arr := arr.(array.VarLenListLike)

		voffsets, minOffset, maxEnd := w.getZeroBasedListViewOffsets(arr)
		vsizes := w.getListViewSizes(arr)

		p.body = append(p.body, voffsets)
		p.body = append(p.body, vsizes)

		w.depth--
		var (
			values = arr.ListValues()
		)

		if minOffset != 0 || maxEnd < int64(values.Len()) {
			values = array.NewSlice(values, minOffset, maxEnd)
			defer values.Release()
		}
		err := w.visit(p, values)

		if err != nil {
			return fmt.Errorf("could not visit list element for array %T: %w", arr, err)
		}
		w.depth++

	case *arrow.FixedSizeListType:
		arr := arr.(*array.FixedSizeList)

		w.depth--

		size := int64(arr.DataType().(*arrow.FixedSizeListType).Len())
		beg := int64(arr.Offset()) * size
		end := int64(arr.Offset()+arr.Len()) * size

		values := array.NewSlice(arr.ListValues(), beg, end)
		defer values.Release()

		err := w.visit(p, values)

		if err != nil {
			return fmt.Errorf("could not visit list element for array %T: %w", arr, err)
		}
		w.depth++

	case *arrow.RunEndEncodedType:
		arr := arr.(*array.RunEndEncoded)
		w.depth--
		child := arr.LogicalRunEndsArray(w.mem)
		defer child.Release()
		if err := w.visit(p, child); err != nil {
			return err
		}
		child = arr.LogicalValuesArray()
		defer child.Release()
		if err := w.visit(p, child); err != nil {
			return err
		}
		w.depth++

	default:
		panic(fmt.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr, dtype))
	}

	return nil
}