func CreateClientMiddleware()

in arrow/flight/client.go [62:159]


func CreateClientMiddleware(middleware CustomClientMiddleware) ClientMiddleware {
	return ClientMiddleware{
		Unary: func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
			nctx := middleware.StartCall(ctx)
			if nctx != nil {
				ctx = nctx
			}

			if hdrs, ok := middleware.(ClientHeadersMiddleware); ok {
				hdrmd := make(metadata.MD)
				trailermd := make(metadata.MD)
				opts = append(opts, grpc.Header(&hdrmd), grpc.Trailer(&trailermd))
				defer func() {
					hdrs.HeadersReceived(ctx, metadata.Join(hdrmd, trailermd))
				}()
			}

			err := invoker(ctx, method, req, reply, cc, opts...)
			if post, ok := middleware.(ClientPostCallMiddleware); ok {
				post.CallCompleted(ctx, err)
			}
			return err
		},
		Stream: func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
			nctx := middleware.StartCall(ctx)
			if nctx != nil {
				ctx = nctx
			}

			cs, err := streamer(ctx, desc, cc, method, opts...)
			hdrs, isHdrs := middleware.(ClientHeadersMiddleware)
			post, isPostcall := middleware.(ClientPostCallMiddleware)
			if !isPostcall && !isHdrs {
				return cs, err
			}

			if err != nil {
				if isHdrs {
					md, _ := cs.Header()
					hdrs.HeadersReceived(ctx, metadata.Join(md, cs.Trailer()))
				}
				if isPostcall {
					post.CallCompleted(ctx, err)
				}
				return cs, err
			}

			// Grab the client stream context because when the finish function or the goroutine below will be
			// executed it's not guaranteed cs.Context() will be valid.
			csCtx := cs.Context()
			finishChan := make(chan struct{})
			isFinished := new(int32)
			*isFinished = 0
			finishFunc := func(err error) {

				// since there are multiple code paths that could call finishFunc
				// we need some sort of synchronization to guard against multiple
				// calls to finish
				if !atomic.CompareAndSwapInt32(isFinished, 0, 1) {
					return
				}

				close(finishChan)
				if isPostcall {
					post.CallCompleted(csCtx, err)
				}
				if isHdrs {
					hdrmd, _ := cs.Header()
					hdrs.HeadersReceived(csCtx, metadata.Join(hdrmd, cs.Trailer()))
				}
			}
			go func() {
				select {
				case <-finishChan:
					// finish is being called by something else, no action necessary
				case <-csCtx.Done():
					finishFunc(csCtx.Err())
				}
			}()

			newCS := &clientStream{
				ClientStream: cs,
				desc:         desc,
				finishFn:     finishFunc,
			}
			// The `ClientStream` interface allows one to omit calling `Recv` if it's
			// known that the result will be `io.EOF`. See
			// http://stackoverflow.com/q/42915337
			// In such cases, there's nothing that triggers the span to finish. We,
			// therefore, set a finalizer so that the span and the context goroutine will
			// at least be cleaned up when the garbage collector is run.
			runtime.SetFinalizer(newCS, func(newcs *clientStream) {
				newcs.finishFn(nil)
			})
			return newCS, nil
		},
	}
}