in go/adbc/driver/snowflake/record_reader.go [416:554]
func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize int) (array.RecordReader, error) {
batches, err := ld.GetBatches()
if err != nil {
return nil, errToAdbcErr(adbc.StatusInternal, err)
}
if len(batches) == 0 {
if ld.TotalRows() != 0 {
// XXX(https://github.com/apache/arrow-adbc/issues/863): Snowflake won't return Arrow data for certain queries
schema, err := rowTypesToArrowSchema(ctx, ld)
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
Code: adbc.StatusInternal,
}
}
bldr := array.NewRecordBuilder(alloc, schema)
defer bldr.Release()
rec, err := jsonDataToArrow(ctx, bldr, ld)
if err != nil {
return nil, err
}
defer rec.Release()
return array.NewRecordReader(schema, []arrow.Record{rec})
}
schema := arrow.NewSchema([]arrow.Field{}, nil)
reader, err := array.NewRecordReader(schema, []arrow.Record{})
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
Code: adbc.StatusInternal,
}
}
return reader, nil
}
ch := make(chan arrow.Record, bufferSize)
r, err := batches[0].GetStream(ctx)
if err != nil {
return nil, errToAdbcErr(adbc.StatusIO, err)
}
rr, err := ipc.NewReader(r, ipc.WithAllocator(alloc))
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
Code: adbc.StatusInvalidState,
}
}
group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
ctx, cancelFn := context.WithCancel(ctx)
schema, recTransform := getTransformer(rr.Schema(), ld)
defer func() {
if err != nil {
close(ch)
cancelFn()
}
}()
group.Go(func() error {
defer rr.Release()
defer r.Close()
if len(batches) > 1 {
defer close(ch)
}
for rr.Next() && ctx.Err() == nil {
rec := rr.Record()
rec, err = recTransform(ctx, rec)
if err != nil {
return err
}
ch <- rec
}
return rr.Err()
})
chs := make([]chan arrow.Record, len(batches))
chs[0] = ch
rdr := &reader{
refCount: 1,
chs: chs,
err: nil,
cancelFn: cancelFn,
schema: schema,
}
lastChannelIndex := len(chs) - 1
go func() {
for i, b := range batches[1:] {
batch, batchIdx := b, i+1
chs[batchIdx] = make(chan arrow.Record, bufferSize)
group.Go(func() error {
// close channels (except the last) so that Next can move on to the next channel properly
if batchIdx != lastChannelIndex {
defer close(chs[batchIdx])
}
rdr, err := batch.GetStream(ctx)
if err != nil {
return err
}
defer rdr.Close()
rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
if err != nil {
return err
}
defer rr.Release()
for rr.Next() && ctx.Err() == nil {
rec := rr.Record()
rec, err = recTransform(ctx, rec)
if err != nil {
return err
}
chs[batchIdx] <- rec
}
return rr.Err()
})
}
}()
go func() {
rdr.err = group.Wait()
// don't close the last channel until after the group is finished,
// so that Next() can only return after reader.err may have been set
close(chs[lastChannelIndex])
}()
return rdr, nil
}