func newRecordReader()

in go/adbc/driver/snowflake/record_reader.go [416:554]


func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize int) (array.RecordReader, error) {
	batches, err := ld.GetBatches()
	if err != nil {
		return nil, errToAdbcErr(adbc.StatusInternal, err)
	}

	if len(batches) == 0 {
		if ld.TotalRows() != 0 {
			// XXX(https://github.com/apache/arrow-adbc/issues/863): Snowflake won't return Arrow data for certain queries
			schema, err := rowTypesToArrowSchema(ctx, ld)
			if err != nil {
				return nil, adbc.Error{
					Msg:  err.Error(),
					Code: adbc.StatusInternal,
				}
			}

			bldr := array.NewRecordBuilder(alloc, schema)
			defer bldr.Release()

			rec, err := jsonDataToArrow(ctx, bldr, ld)
			if err != nil {
				return nil, err
			}
			defer rec.Release()

			return array.NewRecordReader(schema, []arrow.Record{rec})
		}
		schema := arrow.NewSchema([]arrow.Field{}, nil)
		reader, err := array.NewRecordReader(schema, []arrow.Record{})
		if err != nil {
			return nil, adbc.Error{
				Msg:  err.Error(),
				Code: adbc.StatusInternal,
			}
		}
		return reader, nil
	}

	ch := make(chan arrow.Record, bufferSize)
	r, err := batches[0].GetStream(ctx)
	if err != nil {
		return nil, errToAdbcErr(adbc.StatusIO, err)
	}

	rr, err := ipc.NewReader(r, ipc.WithAllocator(alloc))
	if err != nil {
		return nil, adbc.Error{
			Msg:  err.Error(),
			Code: adbc.StatusInvalidState,
		}
	}

	group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
	ctx, cancelFn := context.WithCancel(ctx)

	schema, recTransform := getTransformer(rr.Schema(), ld)

	defer func() {
		if err != nil {
			close(ch)
			cancelFn()
		}
	}()

	group.Go(func() error {
		defer rr.Release()
		defer r.Close()
		if len(batches) > 1 {
			defer close(ch)
		}

		for rr.Next() && ctx.Err() == nil {
			rec := rr.Record()
			rec, err = recTransform(ctx, rec)
			if err != nil {
				return err
			}
			ch <- rec
		}
		return rr.Err()
	})

	chs := make([]chan arrow.Record, len(batches))
	chs[0] = ch
	rdr := &reader{
		refCount: 1,
		chs:      chs,
		err:      nil,
		cancelFn: cancelFn,
		schema:   schema,
	}

	lastChannelIndex := len(chs) - 1
	go func() {
		for i, b := range batches[1:] {
			batch, batchIdx := b, i+1
			chs[batchIdx] = make(chan arrow.Record, bufferSize)
			group.Go(func() error {
				// close channels (except the last) so that Next can move on to the next channel properly
				if batchIdx != lastChannelIndex {
					defer close(chs[batchIdx])
				}

				rdr, err := batch.GetStream(ctx)
				if err != nil {
					return err
				}
				defer rdr.Close()

				rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
				if err != nil {
					return err
				}
				defer rr.Release()

				for rr.Next() && ctx.Err() == nil {
					rec := rr.Record()
					rec, err = recTransform(ctx, rec)
					if err != nil {
						return err
					}
					chs[batchIdx] <- rec
				}

				return rr.Err()
			})
		}
	}()

	go func() {
		rdr.err = group.Wait()
		// don't close the last channel until after the group is finished,
		// so that Next() can only return after reader.err may have been set
		close(chs[lastChannelIndex])
	}()

	return rdr, nil
}