in grpcgcp/gcp_multiendpoint.go [296:377]
func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error {
gme.mu.Lock()
defer gme.mu.Unlock()
if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok {
return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default)
}
validPools := make(map[string]bool)
for _, meo := range meOpts.MultiEndpoints {
for _, e := range meo.Endpoints {
validPools[e] = true
}
}
// Add missing pools.
for e := range validPools {
if _, ok := gme.pools[e]; !ok {
// This creates a ClientConn with the gRPC-GCP balancer managing connection pool.
conn, err := gme.dialFunc(context.Background(), e, gme.opts...)
if err != nil {
return err
}
if gme.log.V(FINE) {
gme.log.Infof("created new channel pool for %q endpoint.", e)
}
gme.pools[e] = newMonitoredConn(e, conn, gme)
}
}
// Add new multi-endpoints and update existing.
for name, meo := range meOpts.MultiEndpoints {
if me, ok := gme.mes[name]; ok {
// Updating existing MultiEndpoint.
me.SetEndpoints(meo.Endpoints)
continue
}
// Add new MultiEndpoint.
if gme.log.V(FINE) {
gme.log.Infof("creating new %q multiendpoint.", name)
}
me, err := multiendpoint.NewMultiEndpoint(meo)
if err != nil {
return err
}
gme.mes[name] = me
}
gme.defaultName = meOpts.Default
// Remove obsolete MultiEndpoints.
for name := range gme.mes {
if _, ok := meOpts.MultiEndpoints[name]; !ok {
delete(gme.mes, name)
if gme.log.V(FINE) {
gme.log.Infof("removed obsolete %q multiendpoint.", name)
}
}
}
// Remove obsolete pools.
for e, mc := range gme.pools {
if _, ok := validPools[e]; !ok {
if err := mc.conn.Close(); err != nil {
gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
if gme.log.V(FINE) {
gme.log.Infof("closed channel pool for %q endpoint.", e)
}
mc.stopMonitoring()
delete(gme.pools, e)
}
}
// Trigger status update.
for e, mc := range gme.pools {
s := mc.conn.GetState()
for _, me := range gme.mes {
me.SetEndpointAvailability(e, s == connectivity.Ready)
}
}
return nil
}