grpcgcp/gcp_multiendpoint.go (247 lines of code) (raw):

/* * * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpcgcp import ( "context" "fmt" "sync" "sync/atomic" "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" ) var gmeCounter uint32 type contextMEKey int var meKey contextMEKey // NewMEContext returns a new Context that carries Multiendpoint name. func NewMEContext(ctx context.Context, name string) context.Context { return context.WithValue(ctx, meKey, name) } // FromMEContext returns the MultiEndpoint name stored in ctx, if any. func FromMEContext(ctx context.Context) (string, bool) { name, ok := ctx.Value(meKey).(string) return name, ok } // GCPMultiEndpoint holds the state of MultiEndpoints-enabled gRPC client connection. // // The purposes of GCPMultiEndpoint are: // // - Fallback to an alternative endpoint (host:port) of a gRPC service when the original // endpoint is completely unavailable. // - Be able to route an RPC call to a specific group of endpoints. // - Be able to reconfigure endpoints in runtime. // // A group of endpoints is called a [multiendpoint.MultiEndpoint] and is essentially a list of endpoints // where priority is defined by the position in the list with the first endpoint having top // priority. A MultiEndpoint tracks endpoints' availability. When a MultiEndpoint is picked for an // RPC call, it picks the top priority endpoint that is currently available. More information on the // [multiendpoint.MultiEndpoint]. // // GCPMultiEndpoint can have one or more MultiEndpoint identified by its name -- arbitrary // string provided in the [GCPMultiEndpointOptions] when configuring MultiEndpoints. This name // can be used to route an RPC call to this MultiEndpoint by using the [NewMEContext]. // // GCPMultiEndpoint uses [GCPMultiEndpointOptions] for initial configuration. // An updated configuration can be provided at any time later using [UpdateMultiEndpoints]. // // Example: // // Let's assume we have a service with read and write operations and the following backends: // // - service.example.com -- the main set of backends supporting all operations // - service-fallback.example.com -- read-write replica supporting all operations // - ro-service.example.com -- read-only replica supporting only read operations // // Example configuration: // // - MultiEndpoint named "default" with endpoints: // // 1. service.example.com:443 // // 2. service-fallback.example.com:443 // // - MultiEndpoint named "read" with endpoints: // // 1. ro-service.example.com:443 // // 2. service-fallback.example.com:443 // // 3. service.example.com:443 // // With the configuration above GCPMultiEndpoint will use the "default" MultiEndpoint by // default. It means that RPC calls by default will use the main endpoint and if it is not available // then the read-write replica. // // To offload some read calls to the read-only replica we can specify "read" MultiEndpoint in the // context. Then these calls will use the read-only replica endpoint and if it is not available // then the read-write replica and if it is also not available then the main endpoint. // // GCPMultiEndpoint creates a [grpcgcp] connection pool for every unique // endpoint. For the example above three connection pools will be created. // // [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used // as a [grpc.ClientConn] when creating gRPC clients. type GCPMultiEndpoint struct { mu sync.RWMutex defaultName string mes map[string]multiendpoint.MultiEndpoint pools map[string]*monitoredConn opts []grpc.DialOption gcpConfig *pb.ApiConfig dialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) log grpclog.LoggerV2 grpc.ClientConnInterface } // Make sure GCPMultiEndpoint implements grpc.ClientConnInterface. var _ grpc.ClientConnInterface = (*GCPMultiEndpoint)(nil) func (gme *GCPMultiEndpoint) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { return gme.pickConn(ctx).Invoke(ctx, method, args, reply, opts...) } func (gme *GCPMultiEndpoint) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { return gme.pickConn(ctx).NewStream(ctx, desc, method, opts...) } func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn { name, ok := FromMEContext(ctx) me, ook := gme.mes[name] if !ok || !ook { me = gme.mes[gme.defaultName] } return gme.pools[me.Current()].conn } func (gme *GCPMultiEndpoint) Close() error { var errs multiError for e, mc := range gme.pools { mc.stopMonitoring() if err := mc.conn.Close(); err != nil { errs = append(errs, err) gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err) } if gme.log.V(FINE) { gme.log.Infof("closed channel pool for %q endpoint.", e) } } return errs.Combine() } func (gme *GCPMultiEndpoint) GCPConfig() *pb.ApiConfig { return proto.Clone(gme.gcpConfig).(*pb.ApiConfig) } // GCPMultiEndpointOptions holds options to construct a MultiEndpoints-enabled gRPC client // connection. type GCPMultiEndpointOptions struct { // Regular gRPC-GCP configuration to be applied to every endpoint. GRPCgcpConfig *pb.ApiConfig // Map of MultiEndpoints where key is the MultiEndpoint name. MultiEndpoints map[string]*multiendpoint.MultiEndpointOptions // Name of the default MultiEndpoint. Default string // Func to dial grpc ClientConn. DialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) } // NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client // connection. // // Deprecated: use NewGCPMultiEndpoint. func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) { return NewGCPMultiEndpoint(meOpts, opts...) } // NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client // connection. // // [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used // as a [grpc.ClientConn] when creating gRPC clients. func NewGCPMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) { // Read config, create multiendpoints and pools. o, err := makeOpts(meOpts, opts) if err != nil { return nil, err } gme := &GCPMultiEndpoint{ mes: make(map[string]multiendpoint.MultiEndpoint), pools: make(map[string]*monitoredConn), defaultName: meOpts.Default, opts: o, gcpConfig: proto.Clone(meOpts.GRPCgcpConfig).(*pb.ApiConfig), dialFunc: meOpts.DialFunc, log: NewGCPLogger(compLogger, fmt.Sprintf("[GCPMultiEndpoint #%d]", atomic.AddUint32(&gmeCounter, 1))), } if gme.dialFunc == nil { gme.dialFunc = func(_ context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { return grpc.Dial(target, opts...) } } if err := gme.UpdateMultiEndpoints(meOpts); err != nil { return nil, err } return gme, nil } func makeOpts(meOpts *GCPMultiEndpointOptions, opts []grpc.DialOption) ([]grpc.DialOption, error) { grpcGCPjsonConfig, err := protojson.Marshal(meOpts.GRPCgcpConfig) if err != nil { return nil, err } o := append([]grpc.DialOption{}, opts...) o = append(o, []grpc.DialOption{ grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, Name, string(grpcGCPjsonConfig))), grpc.WithChainUnaryInterceptor(GCPUnaryClientInterceptor), grpc.WithChainStreamInterceptor(GCPStreamClientInterceptor), }...) return o, nil } type monitoredConn struct { endpoint string conn *grpc.ClientConn gme *GCPMultiEndpoint cancel context.CancelFunc } func newMonitoredConn(endpoint string, conn *grpc.ClientConn, gme *GCPMultiEndpoint) (mc *monitoredConn) { ctx, cancel := context.WithCancel(context.Background()) mc = &monitoredConn{ endpoint: endpoint, conn: conn, gme: gme, cancel: cancel, } go mc.monitor(ctx) return } func (mc *monitoredConn) notify(state connectivity.State) { if mc.gme.log.V(FINE) { mc.gme.log.Infof("%q endpoint state changed to %v", mc.endpoint, state) } // Inform all multiendpoints. mc.gme.mu.RLock() for _, me := range mc.gme.mes { me.SetEndpointAvailability(mc.endpoint, state == connectivity.Ready) } mc.gme.mu.RUnlock() } func (mc *monitoredConn) monitor(ctx context.Context) { for { currentState := mc.conn.GetState() mc.notify(currentState) if !mc.conn.WaitForStateChange(ctx, currentState) { break } } } func (mc *monitoredConn) stopMonitoring() { mc.cancel() } // UpdateMultiEndpoints reconfigures MultiEndpoints. // // MultiEndpoints are matched with the current ones by name. // // - If a current MultiEndpoint is missing in the updated list, the MultiEndpoint will be // removed. // - A new MultiEndpoint will be created for every new name in the list. // - For an existing MultiEndpoint only its endpoints will be updated (no recovery timeout // change). // // Endpoints are matched by the endpoint address (usually in the form of address:port). // // - If an existing endpoint is not used by any MultiEndpoint in the updated list, then the // connection poll for this endpoint will be shutdown. // - A connection pool will be created for every new endpoint. // - For an existing endpoint nothing will change (the connection pool will not be re-created, // thus no connection credentials change, nor connection configuration change). func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error { gme.mu.Lock() defer gme.mu.Unlock() if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok { return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default) } validPools := make(map[string]bool) for _, meo := range meOpts.MultiEndpoints { for _, e := range meo.Endpoints { validPools[e] = true } } // Add missing pools. for e := range validPools { if _, ok := gme.pools[e]; !ok { // This creates a ClientConn with the gRPC-GCP balancer managing connection pool. conn, err := gme.dialFunc(context.Background(), e, gme.opts...) if err != nil { return err } if gme.log.V(FINE) { gme.log.Infof("created new channel pool for %q endpoint.", e) } gme.pools[e] = newMonitoredConn(e, conn, gme) } } // Add new multi-endpoints and update existing. for name, meo := range meOpts.MultiEndpoints { if me, ok := gme.mes[name]; ok { // Updating existing MultiEndpoint. me.SetEndpoints(meo.Endpoints) continue } // Add new MultiEndpoint. if gme.log.V(FINE) { gme.log.Infof("creating new %q multiendpoint.", name) } me, err := multiendpoint.NewMultiEndpoint(meo) if err != nil { return err } gme.mes[name] = me } gme.defaultName = meOpts.Default // Remove obsolete MultiEndpoints. for name := range gme.mes { if _, ok := meOpts.MultiEndpoints[name]; !ok { delete(gme.mes, name) if gme.log.V(FINE) { gme.log.Infof("removed obsolete %q multiendpoint.", name) } } } // Remove obsolete pools. for e, mc := range gme.pools { if _, ok := validPools[e]; !ok { if err := mc.conn.Close(); err != nil { gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err) } if gme.log.V(FINE) { gme.log.Infof("closed channel pool for %q endpoint.", e) } mc.stopMonitoring() delete(gme.pools, e) } } // Trigger status update. for e, mc := range gme.pools { s := mc.conn.GetState() for _, me := range gme.mes { me.SetEndpointAvailability(e, s == connectivity.Ready) } } return nil } type multiError []error func (m multiError) Error() string { s, n := "", 0 for _, e := range m { if e != nil { if n == 0 { s = e.Error() } n++ } } switch n { case 0: return "(0 errors)" case 1: return s case 2: return s + " (and 1 other error)" } return fmt.Sprintf("%s (and %d other errors)", s, n-1) } func (m multiError) Combine() error { if len(m) == 0 { return nil } return m }