in arrow/internal/flight_integration/scenario.go [119:219]
func (s *defaultIntegrationTester) RunClient(addr string, opts ...grpc.DialOption) error {
client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...)
if err != nil {
return err
}
defer client.Close()
ctx := context.Background()
arrow.RegisterExtensionType(types.NewUUIDType())
defer arrow.UnregisterExtensionType("uuid")
descr := &flight.FlightDescriptor{
Type: flight.FlightDescriptor_PATH,
Path: []string{s.path},
}
fmt.Println("Opening JSON file '", s.path, "'")
r, err := os.Open(s.path)
if err != nil {
return xerrors.Errorf("could not open JSON file: %q: %w", s.path, err)
}
rdr, err := arrjson.NewReader(r)
if err != nil {
return xerrors.Errorf("could not create JSON file reader from file: %q: %w", s.path, err)
}
dataSet := integrationDataSet{
chunks: make([]array.Record, 0),
schema: rdr.Schema(),
}
for {
rec, err := rdr.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
defer rec.Release()
dataSet.chunks = append(dataSet.chunks, rec)
}
stream, err := client.DoPut(ctx)
if err != nil {
return err
}
wr := flight.NewRecordWriter(stream, ipc.WithSchema(dataSet.schema))
wr.SetFlightDescriptor(descr)
for i, rec := range dataSet.chunks {
metadata := []byte(strconv.Itoa(i))
if err := wr.WriteWithAppMetadata(rec, metadata); err != nil {
return err
}
pr, err := stream.Recv()
if err != nil {
return err
}
acked := pr.GetAppMetadata()
switch {
case len(acked) == 0:
return xerrors.Errorf("expected metadata value: %s, but got nothing.", string(metadata))
case !bytes.Equal(metadata, acked):
return xerrors.Errorf("expected metadata value: %s, but got: %s", string(metadata), string(acked))
}
}
wr.Close()
if err := stream.CloseSend(); err != nil {
return err
}
info, err := client.GetFlightInfo(ctx, descr)
if err != nil {
return err
}
if len(info.Endpoint) == 0 {
fmt.Fprintln(os.Stderr, "no endpoints returned from flight server.")
return xerrors.Errorf("no endpoints returned from flight server")
}
for _, ep := range info.Endpoint {
if len(ep.Location) == 0 {
return xerrors.Errorf("no locations returned from flight server")
}
for _, loc := range ep.Location {
consumeFlightLocation(ctx, loc, ep.Ticket, dataSet.chunks, opts...)
}
}
return nil
}