in go/adbc/driver/snowflake/record_reader.go [71:246]
func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader) (*arrow.Schema, recordTransformer) {
loc, types := ld.Location(), ld.RowTypes()
fields := make([]arrow.Field, len(sc.Fields()))
transformers := make([]func(context.Context, arrow.Array) (arrow.Array, error), len(sc.Fields()))
for i, f := range sc.Fields() {
srcMeta := types[i]
switch strings.ToUpper(srcMeta.Type) {
case "FIXED":
switch f.Type.ID() {
case arrow.DECIMAL, arrow.DECIMAL256:
if srcMeta.Scale == 0 {
f.Type = arrow.PrimitiveTypes.Int64
} else {
f.Type = arrow.PrimitiveTypes.Float64
}
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
return compute.CastArray(ctx, a, compute.UnsafeCastOptions(f.Type))
}
default:
if srcMeta.Scale != 0 {
f.Type = arrow.PrimitiveTypes.Float64
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
result, err := compute.Divide(ctx, compute.ArithmeticOptions{NoCheckOverflow: true},
&compute.ArrayDatum{Value: a.Data()},
compute.NewDatum(math.Pow10(int(srcMeta.Scale))))
if err != nil {
return nil, err
}
defer result.Release()
return result.(*compute.ArrayDatum).MakeArray(), nil
}
} else {
f.Type = arrow.PrimitiveTypes.Int64
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
return compute.CastArray(ctx, a, compute.SafeCastOptions(arrow.PrimitiveTypes.Int64))
}
}
}
case "TIME":
f.Type = arrow.FixedWidthTypes.Time64ns
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
return compute.CastArray(ctx, a, compute.SafeCastOptions(f.Type))
}
case "TIMESTAMP_NTZ":
dt := &arrow.TimestampType{Unit: arrow.TimeUnit(srcMeta.Scale / 3)}
f.Type = dt
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
if a.DataType().ID() != arrow.STRUCT {
return compute.CastArray(ctx, a, compute.SafeCastOptions(dt))
}
pool := compute.GetAllocator(ctx)
tb := array.NewTimestampBuilder(pool, dt)
defer tb.Release()
structData := a.(*array.Struct)
epoch := structData.Field(0).(*array.Int64).Int64Values()
fraction := structData.Field(1).(*array.Int32).Int32Values()
for i := 0; i < a.Len(); i++ {
if a.IsNull(i) {
tb.AppendNull()
continue
}
v, err := arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])), dt.TimeUnit())
if err != nil {
return nil, err
}
tb.Append(v)
}
return tb.NewArray(), nil
}
case "TIMESTAMP_LTZ":
dt := &arrow.TimestampType{Unit: arrow.TimeUnit(srcMeta.Scale) / 3, TimeZone: loc.String()}
f.Type = dt
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
pool := compute.GetAllocator(ctx)
tb := array.NewTimestampBuilder(pool, dt)
defer tb.Release()
if a.DataType().ID() == arrow.STRUCT {
structData := a.(*array.Struct)
epoch := structData.Field(0).(*array.Int64).Int64Values()
fraction := structData.Field(1).(*array.Int32).Int32Values()
for i := 0; i < a.Len(); i++ {
if a.IsNull(i) {
tb.AppendNull()
continue
}
v, err := arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])), dt.TimeUnit())
if err != nil {
return nil, err
}
tb.Append(v)
}
} else {
for i, t := range a.(*array.Int64).Int64Values() {
if a.IsNull(i) {
tb.AppendNull()
continue
}
q := int64(t) / int64(math.Pow10(int(srcMeta.Scale)))
r := int64(t) % int64(math.Pow10(int(srcMeta.Scale)))
v, err := arrow.TimestampFromTime(time.Unix(q, r), dt.Unit)
if err != nil {
return nil, err
}
tb.Append(v)
}
}
return tb.NewArray(), nil
}
case "TIMESTAMP_TZ":
// we convert each value to UTC since we have timezone information
// with the data that lets us do so.
dt := &arrow.TimestampType{TimeZone: "UTC", Unit: arrow.TimeUnit(srcMeta.Scale / 3)}
f.Type = dt
transformers[i] = func(ctx context.Context, a arrow.Array) (arrow.Array, error) {
pool := compute.GetAllocator(ctx)
tb := array.NewTimestampBuilder(pool, dt)
defer tb.Release()
structData := a.(*array.Struct)
if structData.NumField() == 2 {
epoch := structData.Field(0).(*array.Int64).Int64Values()
tzoffset := structData.Field(1).(*array.Int32).Int32Values()
for i := 0; i < a.Len(); i++ {
if a.IsNull(i) {
tb.AppendNull()
continue
}
loc := gosnowflake.Location(int(tzoffset[i]) - 1440)
v, err := arrow.TimestampFromTime(time.Unix(epoch[i], 0).In(loc), dt.Unit)
if err != nil {
return nil, err
}
tb.Append(v)
}
} else {
epoch := structData.Field(0).(*array.Int64).Int64Values()
fraction := structData.Field(1).(*array.Int32).Int32Values()
tzoffset := structData.Field(2).(*array.Int32).Int32Values()
for i := 0; i < a.Len(); i++ {
if a.IsNull(i) {
tb.AppendNull()
continue
}
loc := gosnowflake.Location(int(tzoffset[i]) - 1440)
v, err := arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])).In(loc), dt.Unit)
if err != nil {
return nil, err
}
tb.Append(v)
}
}
return tb.NewArray(), nil
}
default:
transformers[i] = identCol
}
fields[i] = f
}
meta := sc.Metadata()
out := arrow.NewSchema(fields, &meta)
return out, getRecTransformer(out, transformers)
}