func()

in arrow/internal/flight_integration/scenario.go [119:219]


func (s *defaultIntegrationTester) RunClient(addr string, opts ...grpc.DialOption) error {
	client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...)
	if err != nil {
		return err
	}
	defer client.Close()

	ctx := context.Background()

	arrow.RegisterExtensionType(types.NewUUIDType())
	defer arrow.UnregisterExtensionType("uuid")

	descr := &flight.FlightDescriptor{
		Type: flight.FlightDescriptor_PATH,
		Path: []string{s.path},
	}

	fmt.Println("Opening JSON file '", s.path, "'")
	r, err := os.Open(s.path)
	if err != nil {
		return xerrors.Errorf("could not open JSON file: %q: %w", s.path, err)
	}

	rdr, err := arrjson.NewReader(r)
	if err != nil {
		return xerrors.Errorf("could not create JSON file reader from file: %q: %w", s.path, err)
	}

	dataSet := integrationDataSet{
		chunks: make([]array.Record, 0),
		schema: rdr.Schema(),
	}

	for {
		rec, err := rdr.Read()
		if err != nil {
			if err == io.EOF {
				break
			}
			return err
		}
		defer rec.Release()
		dataSet.chunks = append(dataSet.chunks, rec)
	}

	stream, err := client.DoPut(ctx)
	if err != nil {
		return err
	}

	wr := flight.NewRecordWriter(stream, ipc.WithSchema(dataSet.schema))
	wr.SetFlightDescriptor(descr)

	for i, rec := range dataSet.chunks {
		metadata := []byte(strconv.Itoa(i))
		if err := wr.WriteWithAppMetadata(rec, metadata); err != nil {
			return err
		}

		pr, err := stream.Recv()
		if err != nil {
			return err
		}

		acked := pr.GetAppMetadata()
		switch {
		case len(acked) == 0:
			return xerrors.Errorf("expected metadata value: %s, but got nothing.", string(metadata))
		case !bytes.Equal(metadata, acked):
			return xerrors.Errorf("expected metadata value: %s, but got: %s", string(metadata), string(acked))
		}
	}

	wr.Close()

	if err := stream.CloseSend(); err != nil {
		return err
	}

	info, err := client.GetFlightInfo(ctx, descr)
	if err != nil {
		return err
	}

	if len(info.Endpoint) == 0 {
		fmt.Fprintln(os.Stderr, "no endpoints returned from flight server.")
		return xerrors.Errorf("no endpoints returned from flight server")
	}

	for _, ep := range info.Endpoint {
		if len(ep.Location) == 0 {
			return xerrors.Errorf("no locations returned from flight server")
		}

		for _, loc := range ep.Location {
			consumeFlightLocation(ctx, loc, ep.Ticket, dataSet.chunks, opts...)
		}
	}

	return nil
}