func writeDenseArrow()

in parquet/pqarrow/encode_arrow.go [243:617]


func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err error) {
	if leafArr.DataType().ID() == arrow.EXTENSION {
		extensionArray := leafArr.(array.ExtensionArray)
		// Replace leafArr with its underlying storage array
		leafArr = extensionArray.Storage()
	}

	noNulls := cw.Descr().SchemaNode().RepetitionType() == parquet.Repetitions.Required || leafArr.NullN() == 0

	if ctx.dataBuffer == nil {
		ctx.dataBuffer = memory.NewResizableBuffer(cw.Properties().Allocator())
	}

	switch wr := cw.(type) {
	case *file.BooleanColumnChunkWriter:
		if leafArr.DataType().ID() != arrow.BOOL {
			return fmt.Errorf("type mismatch, column is %s, array is %s", cw.Type(), leafArr.DataType().ID())
		}
		// TODO(mtopol): optimize this so that we aren't converting from
		// the bitmap -> []bool -> bitmap anymore
		if leafArr.Len() == 0 {
			_, err = wr.WriteBatch(nil, defLevels, repLevels)
			break
		}

		ctx.dataBuffer.ResizeNoShrink(leafArr.Len())
		buf := ctx.dataBuffer.Bytes()
		data := *(*[]bool)(unsafe.Pointer(&buf))
		for idx := range data {
			data[idx] = leafArr.(*array.Boolean).Value(idx)
		}
		if !maybeParentNulls && noNulls {
			wr.WriteBatch(data, defLevels, repLevels)
		} else {
			wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
		}
	case *file.Int32ColumnChunkWriter:
		var data []int32
		switch leafArr.DataType().ID() {
		case arrow.INT32:
			data = leafArr.(*array.Int32).Int32Values()
		case arrow.DATE32, arrow.UINT32:
			if leafArr.Data().Buffers()[1] != nil {
				data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
				data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
			}
		case arrow.TIME32:
			if leafArr.DataType().(*arrow.Time32Type).Unit != arrow.Second {
				if leafArr.Data().Buffers()[1] != nil {
					data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
					data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
				}
			} else { // coerce time32 if necessary by multiplying by 1000
				ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
				data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
				for idx, val := range leafArr.(*array.Time32).Time32Values() {
					data[idx] = int32(val) * 1000
				}
			}
		case arrow.NULL:
			wr.WriteBatchSpaced(nil, defLevels, repLevels, leafArr.NullBitmapBytes(), 0)
			return

		default:
			// simple integral cases, parquet physical storage is int32 or int64
			// so we have to create a new array of int32's for anything smaller than
			// 32-bits
			ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
			data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
			switch leafArr.DataType().ID() {
			case arrow.UINT8:
				for idx, val := range leafArr.(*array.Uint8).Uint8Values() {
					data[idx] = int32(val)
				}
			case arrow.INT8:
				for idx, val := range leafArr.(*array.Int8).Int8Values() {
					data[idx] = int32(val)
				}
			case arrow.UINT16:
				for idx, val := range leafArr.(*array.Uint16).Uint16Values() {
					data[idx] = int32(val)
				}
			case arrow.INT16:
				for idx, val := range leafArr.(*array.Int16).Int16Values() {
					data[idx] = int32(val)
				}
			case arrow.DATE64:
				for idx, val := range leafArr.(*array.Date64).Date64Values() {
					data[idx] = int32(val / 86400000) // coerce date64 values
				}
			case arrow.DECIMAL128:
				for idx, val := range leafArr.(*array.Decimal128).Values() {
					debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
					debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
					data[idx] = int32(val.LowBits())
				}
			case arrow.DECIMAL256:
				for idx, val := range leafArr.(*array.Decimal256).Values() {
					debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
					debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
					data[idx] = int32(val.LowBits())
				}
			default:
				return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name())
			}
		}

		if !maybeParentNulls && noNulls {
			_, err = wr.WriteBatch(data, defLevels, repLevels)
		} else {
			nulls := leafArr.NullBitmapBytes()
			wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
		}
	case *file.Int64ColumnChunkWriter:
		var data []int64
		switch leafArr.DataType().ID() {
		case arrow.TIMESTAMP:
			tstype := leafArr.DataType().(*arrow.TimestampType)
			if ctx.props.coerceTimestamps {
				// user explicitly requested coercion to specific unit
				if tstype.Unit == ctx.props.coerceTimestampUnit {
					// no conversion necessary
					if leafArr.Data().Buffers()[1] != nil {
						data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
						data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
					}
				} else {
					ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
					data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
					if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &ctx.props, data); err != nil {
						return err
					}
				}
			} else if (cw.Properties().Version() == parquet.V1_0 || cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond {
				// absent superceding user instructions, when writing a Parquet Version <=2.4 File,
				// timestamps in nanoseconds are coerced to microseconds
				ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
				data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
				p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond), WithTruncatedTimestamps(true))
				if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
					return err
				}
			} else if tstype.Unit == arrow.Second {
				// absent superceding user instructions, timestamps in seconds are coerced
				// to milliseconds
				p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Millisecond))
				ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
				data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
				if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
					return err
				}
			} else {
				// no data conversion necessary
				if leafArr.Data().Buffers()[1] != nil {
					data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
					data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
				}
			}
		case arrow.UINT32:
			ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
			data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
			for idx, val := range leafArr.(*array.Uint32).Uint32Values() {
				data[idx] = int64(val)
			}
		case arrow.INT64:
			data = leafArr.(*array.Int64).Int64Values()
		case arrow.UINT64, arrow.TIME64, arrow.DATE64:
			if leafArr.Data().Buffers()[1] != nil {
				data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
				data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
			}
		case arrow.DECIMAL128:
			ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
			data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
			for idx, val := range leafArr.(*array.Decimal128).Values() {
				debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
				data[idx] = int64(val.LowBits())
			}
		case arrow.DECIMAL256:
			ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
			data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
			for idx, val := range leafArr.(*array.Decimal256).Values() {
				debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
				data[idx] = int64(val.LowBits())
			}
		default:
			return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name())
		}

		if !maybeParentNulls && noNulls {
			_, err = wr.WriteBatch(data, defLevels, repLevels)
		} else {
			nulls := leafArr.NullBitmapBytes()
			wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
		}
	case *file.Int96ColumnChunkWriter:
		if leafArr.DataType().ID() != arrow.TIMESTAMP {
			return errors.New("unsupported arrow type to write to Int96 column")
		}
		ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len()))
		data := parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes())
		input := leafArr.(*array.Timestamp).TimestampValues()
		unit := leafArr.DataType().(*arrow.TimestampType).Unit
		for idx, val := range input {
			arrowTimestampToImpalaTimestamp(unit, int64(val), &data[idx])
		}

		if !maybeParentNulls && noNulls {
			_, err = wr.WriteBatch(data, defLevels, repLevels)
		} else {
			nulls := leafArr.NullBitmapBytes()
			wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
		}
	case *file.Float32ColumnChunkWriter:
		if leafArr.DataType().ID() != arrow.FLOAT32 {
			return errors.New("invalid column type to write to Float")
		}
		if !maybeParentNulls && noNulls {
			_, err = wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels)
		} else {
			wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
		}
	case *file.Float64ColumnChunkWriter:
		if leafArr.DataType().ID() != arrow.FLOAT64 {
			return errors.New("invalid column type to write to Float")
		}
		if !maybeParentNulls && noNulls {
			_, err = wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels)
		} else {
			wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
		}
	case *file.ByteArrayColumnChunkWriter:
		var (
			buffer   = leafArr.Data().Buffers()[2]
			valueBuf []byte
		)

		if buffer == nil {
			valueBuf = []byte{}
		} else {
			valueBuf = buffer.Bytes()
		}

		data := make([]parquet.ByteArray, leafArr.Len())
		switch leafArr.DataType().ID() {
		case arrow.BINARY, arrow.STRING:
			offsets := leafArr.(binaryarr).ValueOffsets()
			for i := range data {
				data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
			}
		case arrow.LARGE_BINARY, arrow.LARGE_STRING:
			offsets := leafArr.(binary64arr).ValueOffsets()
			for i := range data {
				data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
			}
		default:
			return fmt.Errorf("%w: invalid column type to write to ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
		}

		if !maybeParentNulls && noNulls {
			_, err = wr.WriteBatch(data, defLevels, repLevels)
		} else {
			wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
		}

	case *file.FixedLenByteArrayColumnChunkWriter:
		switch dt := leafArr.DataType().(type) {
		case *arrow.FixedSizeBinaryType:
			data := make([]parquet.FixedLenByteArray, leafArr.Len())
			for idx := range data {
				data[idx] = leafArr.(*array.FixedSizeBinary).Value(idx)
			}
			if !maybeParentNulls && noNulls {
				_, err = wr.WriteBatch(data, defLevels, repLevels)
			} else {
				wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
			}
		case *arrow.Decimal128Type:
			// parquet decimal are stored with FixedLength values where the length is
			// proportional to the precision. Arrow's Decimal are always stored with 16/32
			// bytes. thus the internal FLBA must be adjusted by the offset calculation
			offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
			ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
			scratch := ctx.dataBuffer.Bytes()
			typeLen := wr.Descr().TypeLength()
			fixDecimalEndianness := func(in decimal128.Num) parquet.FixedLenByteArray {
				out := scratch[offset : offset+typeLen]
				binary.BigEndian.PutUint64(scratch, uint64(in.HighBits()))
				binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], in.LowBits())
				scratch = scratch[2*arrow.Uint64SizeBytes:]
				return out
			}

			data := make([]parquet.FixedLenByteArray, leafArr.Len())
			arr := leafArr.(*array.Decimal128)
			if leafArr.NullN() == 0 {
				for idx := range data {
					data[idx] = fixDecimalEndianness(arr.Value(idx))
				}
				_, err = wr.WriteBatch(data, defLevels, repLevels)
			} else {
				for idx := range data {
					if arr.IsValid(idx) {
						data[idx] = fixDecimalEndianness(arr.Value(idx))
					}
				}
				wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
			}
		case *arrow.Decimal256Type:
			// parquet decimal are stored with FixedLength values where the length is
			// proportional to the precision. Arrow's Decimal are always stored with 16/32
			// bytes. thus the internal FLBA must be adjusted by the offset calculation
			offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
			ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
			scratch := ctx.dataBuffer.Bytes()
			typeLen := wr.Descr().TypeLength()
			fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray {
				out := scratch[offset : offset+typeLen]
				vals := in.Array()
				binary.BigEndian.PutUint64(scratch, vals[3])
				binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2])
				binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1])
				binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0])
				scratch = scratch[4*arrow.Uint64SizeBytes:]
				return out
			}

			data := make([]parquet.FixedLenByteArray, leafArr.Len())
			arr := leafArr.(*array.Decimal256)
			if leafArr.NullN() == 0 {
				for idx := range data {
					data[idx] = fixDecimalEndianness(arr.Value(idx))
				}
				_, err = wr.WriteBatch(data, defLevels, repLevels)
			} else {
				for idx := range data {
					if arr.IsValid(idx) {
						data[idx] = fixDecimalEndianness(arr.Value(idx))
					}
				}
				wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
			}
		case *arrow.Float16Type:
			typeLen := wr.Descr().TypeLength()
			if typeLen != arrow.Float16SizeBytes {
				return fmt.Errorf("%w: invalid FixedLenByteArray length to write from float16 column: %d", arrow.ErrInvalid, typeLen)
			}

			arr := leafArr.(*array.Float16)
			rawValues := arrow.Float16Traits.CastToBytes(arr.Values())
			data := make([]parquet.FixedLenByteArray, arr.Len())

			if arr.NullN() == 0 {
				for idx := range data {
					offset := idx * typeLen
					data[idx] = rawValues[offset : offset+typeLen]
				}
				_, err = wr.WriteBatch(data, defLevels, repLevels)
			} else {
				for idx := range data {
					if arr.IsValid(idx) {
						offset := idx * typeLen
						data[idx] = rawValues[offset : offset+typeLen]
					}
				}
				wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
			}
		default:
			return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
		}
	default:
		return errors.New("unknown column writer physical type")
	}
	return
}