in parquet/pqarrow/encode_arrow.go [243:617]
func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err error) {
if leafArr.DataType().ID() == arrow.EXTENSION {
extensionArray := leafArr.(array.ExtensionArray)
// Replace leafArr with its underlying storage array
leafArr = extensionArray.Storage()
}
noNulls := cw.Descr().SchemaNode().RepetitionType() == parquet.Repetitions.Required || leafArr.NullN() == 0
if ctx.dataBuffer == nil {
ctx.dataBuffer = memory.NewResizableBuffer(cw.Properties().Allocator())
}
switch wr := cw.(type) {
case *file.BooleanColumnChunkWriter:
if leafArr.DataType().ID() != arrow.BOOL {
return fmt.Errorf("type mismatch, column is %s, array is %s", cw.Type(), leafArr.DataType().ID())
}
// TODO(mtopol): optimize this so that we aren't converting from
// the bitmap -> []bool -> bitmap anymore
if leafArr.Len() == 0 {
_, err = wr.WriteBatch(nil, defLevels, repLevels)
break
}
ctx.dataBuffer.ResizeNoShrink(leafArr.Len())
buf := ctx.dataBuffer.Bytes()
data := *(*[]bool)(unsafe.Pointer(&buf))
for idx := range data {
data[idx] = leafArr.(*array.Boolean).Value(idx)
}
if !maybeParentNulls && noNulls {
wr.WriteBatch(data, defLevels, repLevels)
} else {
wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *file.Int32ColumnChunkWriter:
var data []int32
switch leafArr.DataType().ID() {
case arrow.INT32:
data = leafArr.(*array.Int32).Int32Values()
case arrow.DATE32, arrow.UINT32:
if leafArr.Data().Buffers()[1] != nil {
data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
}
case arrow.TIME32:
if leafArr.DataType().(*arrow.Time32Type).Unit != arrow.Second {
if leafArr.Data().Buffers()[1] != nil {
data = arrow.Int32Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
}
} else { // coerce time32 if necessary by multiplying by 1000
ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
for idx, val := range leafArr.(*array.Time32).Time32Values() {
data[idx] = int32(val) * 1000
}
}
case arrow.NULL:
wr.WriteBatchSpaced(nil, defLevels, repLevels, leafArr.NullBitmapBytes(), 0)
return
default:
// simple integral cases, parquet physical storage is int32 or int64
// so we have to create a new array of int32's for anything smaller than
// 32-bits
ctx.dataBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(leafArr.Len()))
data = arrow.Int32Traits.CastFromBytes(ctx.dataBuffer.Bytes())
switch leafArr.DataType().ID() {
case arrow.UINT8:
for idx, val := range leafArr.(*array.Uint8).Uint8Values() {
data[idx] = int32(val)
}
case arrow.INT8:
for idx, val := range leafArr.(*array.Int8).Int8Values() {
data[idx] = int32(val)
}
case arrow.UINT16:
for idx, val := range leafArr.(*array.Uint16).Uint16Values() {
data[idx] = int32(val)
}
case arrow.INT16:
for idx, val := range leafArr.(*array.Int16).Int16Values() {
data[idx] = int32(val)
}
case arrow.DATE64:
for idx, val := range leafArr.(*array.Date64).Date64Values() {
data[idx] = int32(val / 86400000) // coerce date64 values
}
case arrow.DECIMAL128:
for idx, val := range leafArr.(*array.Decimal128).Values() {
debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
data[idx] = int32(val.LowBits())
}
case arrow.DECIMAL256:
for idx, val := range leafArr.(*array.Decimal256).Values() {
debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
data[idx] = int32(val.LowBits())
}
default:
return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name())
}
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
nulls := leafArr.NullBitmapBytes()
wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
}
case *file.Int64ColumnChunkWriter:
var data []int64
switch leafArr.DataType().ID() {
case arrow.TIMESTAMP:
tstype := leafArr.DataType().(*arrow.TimestampType)
if ctx.props.coerceTimestamps {
// user explicitly requested coercion to specific unit
if tstype.Unit == ctx.props.coerceTimestampUnit {
// no conversion necessary
if leafArr.Data().Buffers()[1] != nil {
data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
}
} else {
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &ctx.props, data); err != nil {
return err
}
}
} else if (cw.Properties().Version() == parquet.V1_0 || cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond {
// absent superceding user instructions, when writing a Parquet Version <=2.4 File,
// timestamps in nanoseconds are coerced to microseconds
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond), WithTruncatedTimestamps(true))
if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
return err
}
} else if tstype.Unit == arrow.Second {
// absent superceding user instructions, timestamps in seconds are coerced
// to milliseconds
p := NewArrowWriterProperties(WithCoerceTimestamps(arrow.Millisecond))
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
if err := writeCoerceTimestamps(leafArr.(*array.Timestamp), &p, data); err != nil {
return err
}
} else {
// no data conversion necessary
if leafArr.Data().Buffers()[1] != nil {
data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
}
}
case arrow.UINT32:
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
for idx, val := range leafArr.(*array.Uint32).Uint32Values() {
data[idx] = int64(val)
}
case arrow.INT64:
data = leafArr.(*array.Int64).Int64Values()
case arrow.UINT64, arrow.TIME64, arrow.DATE64:
if leafArr.Data().Buffers()[1] != nil {
data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
}
case arrow.DECIMAL128:
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
for idx, val := range leafArr.(*array.Decimal128).Values() {
debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
data[idx] = int64(val.LowBits())
}
case arrow.DECIMAL256:
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
for idx, val := range leafArr.(*array.Decimal256).Values() {
debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
data[idx] = int64(val.LowBits())
}
default:
return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name())
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
nulls := leafArr.NullBitmapBytes()
wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
}
case *file.Int96ColumnChunkWriter:
if leafArr.DataType().ID() != arrow.TIMESTAMP {
return errors.New("unsupported arrow type to write to Int96 column")
}
ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len()))
data := parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes())
input := leafArr.(*array.Timestamp).TimestampValues()
unit := leafArr.DataType().(*arrow.TimestampType).Unit
for idx, val := range input {
arrowTimestampToImpalaTimestamp(unit, int64(val), &data[idx])
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
nulls := leafArr.NullBitmapBytes()
wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset()))
}
case *file.Float32ColumnChunkWriter:
if leafArr.DataType().ID() != arrow.FLOAT32 {
return errors.New("invalid column type to write to Float")
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels)
} else {
wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *file.Float64ColumnChunkWriter:
if leafArr.DataType().ID() != arrow.FLOAT64 {
return errors.New("invalid column type to write to Float")
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels)
} else {
wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *file.ByteArrayColumnChunkWriter:
var (
buffer = leafArr.Data().Buffers()[2]
valueBuf []byte
)
if buffer == nil {
valueBuf = []byte{}
} else {
valueBuf = buffer.Bytes()
}
data := make([]parquet.ByteArray, leafArr.Len())
switch leafArr.DataType().ID() {
case arrow.BINARY, arrow.STRING:
offsets := leafArr.(binaryarr).ValueOffsets()
for i := range data {
data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
}
case arrow.LARGE_BINARY, arrow.LARGE_STRING:
offsets := leafArr.(binary64arr).ValueOffsets()
for i := range data {
data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
}
default:
return fmt.Errorf("%w: invalid column type to write to ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *file.FixedLenByteArrayColumnChunkWriter:
switch dt := leafArr.DataType().(type) {
case *arrow.FixedSizeBinaryType:
data := make([]parquet.FixedLenByteArray, leafArr.Len())
for idx := range data {
data[idx] = leafArr.(*array.FixedSizeBinary).Value(idx)
}
if !maybeParentNulls && noNulls {
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *arrow.Decimal128Type:
// parquet decimal are stored with FixedLength values where the length is
// proportional to the precision. Arrow's Decimal are always stored with 16/32
// bytes. thus the internal FLBA must be adjusted by the offset calculation
offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
scratch := ctx.dataBuffer.Bytes()
typeLen := wr.Descr().TypeLength()
fixDecimalEndianness := func(in decimal128.Num) parquet.FixedLenByteArray {
out := scratch[offset : offset+typeLen]
binary.BigEndian.PutUint64(scratch, uint64(in.HighBits()))
binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], in.LowBits())
scratch = scratch[2*arrow.Uint64SizeBytes:]
return out
}
data := make([]parquet.FixedLenByteArray, leafArr.Len())
arr := leafArr.(*array.Decimal128)
if leafArr.NullN() == 0 {
for idx := range data {
data[idx] = fixDecimalEndianness(arr.Value(idx))
}
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
for idx := range data {
if arr.IsValid(idx) {
data[idx] = fixDecimalEndianness(arr.Value(idx))
}
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
case *arrow.Decimal256Type:
// parquet decimal are stored with FixedLength values where the length is
// proportional to the precision. Arrow's Decimal are always stored with 16/32
// bytes. thus the internal FLBA must be adjusted by the offset calculation
offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
scratch := ctx.dataBuffer.Bytes()
typeLen := wr.Descr().TypeLength()
fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray {
out := scratch[offset : offset+typeLen]
vals := in.Array()
binary.BigEndian.PutUint64(scratch, vals[3])
binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2])
binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1])
binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0])
scratch = scratch[4*arrow.Uint64SizeBytes:]
return out
}
data := make([]parquet.FixedLenByteArray, leafArr.Len())
arr := leafArr.(*array.Decimal256)
if leafArr.NullN() == 0 {
for idx := range data {
data[idx] = fixDecimalEndianness(arr.Value(idx))
}
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
for idx := range data {
if arr.IsValid(idx) {
data[idx] = fixDecimalEndianness(arr.Value(idx))
}
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
case *arrow.Float16Type:
typeLen := wr.Descr().TypeLength()
if typeLen != arrow.Float16SizeBytes {
return fmt.Errorf("%w: invalid FixedLenByteArray length to write from float16 column: %d", arrow.ErrInvalid, typeLen)
}
arr := leafArr.(*array.Float16)
rawValues := arrow.Float16Traits.CastToBytes(arr.Values())
data := make([]parquet.FixedLenByteArray, arr.Len())
if arr.NullN() == 0 {
for idx := range data {
offset := idx * typeLen
data[idx] = rawValues[offset : offset+typeLen]
}
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
for idx := range data {
if arr.IsValid(idx) {
offset := idx * typeLen
data[idx] = rawValues[offset : offset+typeLen]
}
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
default:
return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
default:
return errors.New("unknown column writer physical type")
}
return
}