grpcgcp/gcp_interceptor.go (88 lines of code) (raw):

/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpcgcp import ( "context" "sync" "google.golang.org/grpc" ) type key int var gcpKey key type gcpContext struct { // request message used for pre-process of an affinity call reqMsg interface{} // response message used for post-process of an affinity call replyMsg interface{} } // GCPUnaryClientInterceptor intercepts the execution of a unary RPC // and injects necessary information to be used by the picker. func GCPUnaryClientInterceptor( ctx context.Context, method string, req interface{}, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { gcpCtx := &gcpContext{ reqMsg: req, replyMsg: reply, } ctx = context.WithValue(ctx, gcpKey, gcpCtx) return invoker(ctx, method, req, reply, cc, opts...) } // GCPStreamClientInterceptor intercepts the execution of a client streaming RPC // and injects necessary information to be used by the picker. func GCPStreamClientInterceptor( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { // This constructor does not create a real ClientStream, // it only stores all parameters and let SendMsg() to create ClientStream. cs := &gcpClientStream{ ctx: ctx, desc: desc, cc: cc, method: method, streamer: streamer, opts: opts, } cs.cond = sync.NewCond(cs) return cs, nil } type gcpClientStream struct { sync.Mutex grpc.ClientStream cond *sync.Cond initStreamErr error ctx context.Context desc *grpc.StreamDesc cc *grpc.ClientConn method string streamer grpc.Streamer opts []grpc.CallOption } func (cs *gcpClientStream) SendMsg(m interface{}) error { cs.Lock() // Initialize underlying ClientStream when getting the first request. if cs.ClientStream == nil { ctx := context.WithValue(cs.ctx, gcpKey, &gcpContext{reqMsg: m}) realCS, err := cs.streamer(ctx, cs.desc, cs.cc, cs.method, cs.opts...) if err != nil { cs.initStreamErr = err cs.Unlock() cs.cond.Broadcast() return err } cs.ClientStream = realCS } cs.Unlock() cs.cond.Broadcast() return cs.ClientStream.SendMsg(m) } func (cs *gcpClientStream) RecvMsg(m interface{}) error { // If RecvMsg is called before SendMsg, it should wait until cs.ClientStream // is initialized or the initialization failed. cs.Lock() for cs.initStreamErr == nil && cs.ClientStream == nil { cs.cond.Wait() } if cs.initStreamErr != nil { cs.Unlock() return cs.initStreamErr } cs.Unlock() return cs.ClientStream.RecvMsg(m) }