in arrow/flight/client.go [62:159]
func CreateClientMiddleware(middleware CustomClientMiddleware) ClientMiddleware {
return ClientMiddleware{
Unary: func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
nctx := middleware.StartCall(ctx)
if nctx != nil {
ctx = nctx
}
if hdrs, ok := middleware.(ClientHeadersMiddleware); ok {
hdrmd := make(metadata.MD)
trailermd := make(metadata.MD)
opts = append(opts, grpc.Header(&hdrmd), grpc.Trailer(&trailermd))
defer func() {
hdrs.HeadersReceived(ctx, metadata.Join(hdrmd, trailermd))
}()
}
err := invoker(ctx, method, req, reply, cc, opts...)
if post, ok := middleware.(ClientPostCallMiddleware); ok {
post.CallCompleted(ctx, err)
}
return err
},
Stream: func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
nctx := middleware.StartCall(ctx)
if nctx != nil {
ctx = nctx
}
cs, err := streamer(ctx, desc, cc, method, opts...)
hdrs, isHdrs := middleware.(ClientHeadersMiddleware)
post, isPostcall := middleware.(ClientPostCallMiddleware)
if !isPostcall && !isHdrs {
return cs, err
}
if err != nil {
if isHdrs {
md, _ := cs.Header()
hdrs.HeadersReceived(ctx, metadata.Join(md, cs.Trailer()))
}
if isPostcall {
post.CallCompleted(ctx, err)
}
return cs, err
}
// Grab the client stream context because when the finish function or the goroutine below will be
// executed it's not guaranteed cs.Context() will be valid.
csCtx := cs.Context()
finishChan := make(chan struct{})
isFinished := new(int32)
*isFinished = 0
finishFunc := func(err error) {
// since there are multiple code paths that could call finishFunc
// we need some sort of synchronization to guard against multiple
// calls to finish
if !atomic.CompareAndSwapInt32(isFinished, 0, 1) {
return
}
close(finishChan)
if isPostcall {
post.CallCompleted(csCtx, err)
}
if isHdrs {
hdrmd, _ := cs.Header()
hdrs.HeadersReceived(csCtx, metadata.Join(hdrmd, cs.Trailer()))
}
}
go func() {
select {
case <-finishChan:
// finish is being called by something else, no action necessary
case <-csCtx.Done():
finishFunc(csCtx.Err())
}
}()
newCS := &clientStream{
ClientStream: cs,
desc: desc,
finishFn: finishFunc,
}
// The `ClientStream` interface allows one to omit calling `Recv` if it's
// known that the result will be `io.EOF`. See
// http://stackoverflow.com/q/42915337
// In such cases, there's nothing that triggers the span to finish. We,
// therefore, set a finalizer so that the span and the context goroutine will
// at least be cleaned up when the garbage collector is run.
runtime.SetFinalizer(newCS, func(newcs *clientStream) {
newcs.finishFn(nil)
})
return newCS, nil
},
}
}