grpcgcp/multiendpoint/multiendpoint.go (182 lines of code) (raw):

/* * * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ // Package multiendpoint implements multiendpoint feature. See [MultiEndpoint] package multiendpoint import ( "errors" "fmt" "sync" "time" ) type timerAlike interface { Reset(time.Duration) bool Stop() bool } // To be redefined in tests. var ( timeNow = func() time.Time { return time.Now() } timeAfterFunc = func(d time.Duration, f func()) timerAlike { return time.AfterFunc(d, f) } ) // MultiEndpoint holds a list of endpoints, tracks their availability and defines the current // endpoint. An endpoint has a priority defined by its position in the list (first item has top // priority). // // The current endpoint is the highest available endpoint in the list. If no endpoint is available, // MultiEndpoint sticks to the previously current endpoint. // // Sometimes switching between endpoints can be costly, and it is worth waiting for some time // after current endpoint becomes unavailable. For this case, use // [MultiEndpointOptions.RecoveryTimeout] to set the recovery timeout. MultiEndpoint will keep the // current endpoint for up to recovery timeout after it became unavailable to give it some time to // recover. // // The list of endpoints can be changed at any time with [MultiEndpoint.SetEndpoints] function. // MultiEndpoint will: // - remove obsolete endpoints; // - preserve remaining endpoints and their states; // - add new endpoints; // - update all endpoints priority according to the new order; // - change current endpoint if necessary. // // After updating the list of endpoints, MultiEndpoint will switch the current endpoint to the // highest available endpoint in the list. If you have many processes using MultiEndpoint, this may // lead to immediate shift of all traffic which may be undesired. To smooth this transfer, use // [MultiEndpointOptions.SwitchingDelay] with randomized value to introduce a jitter. Each // MultiEndpoint will delay switching from an available endpoint to another endpoint for this amount // of time. This delay is only applicable when switching from a lower priority available endpoint to // a higher priority available endpoint. type MultiEndpoint interface { // Current returns current endpoint. // // Note that the read is not synchronized and in case of a race condition there is a chance of // getting an outdated current endpoint. Current() string // SetEndpointAvailability informs MultiEndpoint when an endpoint becomes available or unavailable. // This may change the current endpoint. SetEndpointAvailability(e string, avail bool) // SetEndpoints updates a list of endpoints: // - remove obsolete endpoints // - preserve remaining endpoints and their states // - add new endpoints // - update all endpoints priority according to the new order // This may change the current endpoint. SetEndpoints(endpoints []string) error } // MultiEndpointOptions is used for configuring [MultiEndpoint]. type MultiEndpointOptions struct { // A list of endpoints ordered by priority (first endpoint has top priority). Endpoints []string // RecoveryTimeout sets the amount of time MultiEndpoint keeps endpoint as current after it // became unavailable. RecoveryTimeout time.Duration // When switching from a lower priority available endpoint to a higher priority available // endpoint the MultiEndpoint will delay the switch for this duration. SwitchingDelay time.Duration } // NewMultiEndpoint validates options and creates a new [MultiEndpoint]. func NewMultiEndpoint(b *MultiEndpointOptions) (MultiEndpoint, error) { if len(b.Endpoints) == 0 { return nil, fmt.Errorf("endpoints list cannot be empty") } me := &multiEndpoint{ recoveryTimeout: b.RecoveryTimeout, switchingDelay: b.SwitchingDelay, current: b.Endpoints[0], } eMap := make(map[string]*endpoint) for i, e := range b.Endpoints { eMap[e] = me.newEndpoint(e, i) } me.endpoints = eMap return me, nil } type multiEndpoint struct { sync.RWMutex endpoints map[string]*endpoint recoveryTimeout time.Duration switchingDelay time.Duration current string future string } // Current returns current endpoint. func (me *multiEndpoint) Current() string { me.RLock() defer me.RUnlock() return me.current } // SetEndpoints updates endpoints list: // - remove obsolete endpoints; // - preserve remaining endpoints and their states; // - add new endpoints; // - update all endpoints priority according to the new order; // - change current endpoint if necessary. func (me *multiEndpoint) SetEndpoints(endpoints []string) error { me.Lock() defer me.Unlock() if len(endpoints) == 0 { return errors.New("endpoints list cannot be empty") } newEndpoints := make(map[string]struct{}) for _, v := range endpoints { newEndpoints[v] = struct{}{} } // Remove obsolete endpoints. for e := range me.endpoints { if _, ok := newEndpoints[e]; !ok { delete(me.endpoints, e) } } // Add new endpoints and update priority. for i, e := range endpoints { if _, ok := me.endpoints[e]; !ok { me.endpoints[e] = me.newEndpoint(e, i) } else { me.endpoints[e].priority = i } } me.maybeUpdateCurrent() return nil } // Updates current to the top-priority available endpoint unless the current endpoint is // recovering. // // Must be run under me.Lock. func (me *multiEndpoint) maybeUpdateCurrent() { c, exists := me.endpoints[me.current] var topA *endpoint var top *endpoint for _, e := range me.endpoints { if e.status == available && (topA == nil || topA.priority > e.priority) { topA = e } if top == nil || top.priority > e.priority { top = e } } if exists && c.status == recovering && (topA == nil || topA.priority > c.priority) { // Let current endpoint recover while no higher priority endpoints available. return } // Always prefer top available endpoint. if topA != nil { me.switchFromTo(c, topA) return } // If no current endpoint exists, resort to the top priority endpoint immediately. if !exists { me.current = top.id } } func (me *multiEndpoint) newEndpoint(id string, priority int) *endpoint { s := unavailable if me.recoveryTimeout > 0 { s = recovering } e := &endpoint{ id: id, priority: priority, status: s, } if e.status == recovering { me.scheduleUnavailable(e) } return e } // Changes or schedules a change of current to the endpoint t. // // Must be run under me.Lock. func (me *multiEndpoint) switchFromTo(f, t *endpoint) { if me.current == t.id { return } if me.switchingDelay == 0 || f == nil || f.status == unavailable { // Switching immediately if no delay or no current or current is unavailable. me.current = t.id return } me.future = t.id timeAfterFunc(me.switchingDelay, func() { me.Lock() defer me.Unlock() if e, ok := me.endpoints[me.future]; ok && e.status == available { me.current = e.id } }) } // SetEndpointAvailability updates the state of an endpoint. func (me *multiEndpoint) SetEndpointAvailability(e string, avail bool) { me.Lock() defer me.Unlock() me.setEndpointAvailability(e, avail) me.maybeUpdateCurrent() } // Must be run under me.Lock. func (me *multiEndpoint) setEndpointAvailability(e string, avail bool) { ee, ok := me.endpoints[e] if !ok { return } if avail { setState(ee, available) return } if ee.status != available { return } if me.recoveryTimeout == 0 { setState(ee, unavailable) return } setState(ee, recovering) me.scheduleUnavailable(ee) } // Change the state of endpoint e to state s. // // Must be run under me.Lock. func setState(e *endpoint, s status) { if e.futureChange != nil { e.futureChange.Stop() } e.status = s e.lastChange = timeNow() } // Schedule endpoint e to become unavailable after recoveryTimeout. func (me *multiEndpoint) scheduleUnavailable(e *endpoint) { stateChange := e.lastChange e.futureChange = timeAfterFunc(me.recoveryTimeout, func() { me.Lock() defer me.Unlock() if e.lastChange != stateChange { // This timer is outdated. return } setState(e, unavailable) me.maybeUpdateCurrent() }) }