in dax/internal/client/cluster.go [570:638]
func (c *cluster) update(config []serviceEndpoint) error {
newEndpoints := make(map[hostPort]struct{}, len(config))
for _, cfg := range config {
newEndpoints[cfg.hostPort()] = struct{}{}
}
newActive := make(map[hostPort]clientAndConfig, len(config))
newRoutes := make([]DaxAPI, len(config))
shouldUpdateRoutes := true
var toClose []clientAndConfig
// Track the newly created client instances, so that we can clean them up in case of partial failures.
var newCliCfg []clientAndConfig
c.lock.Lock()
cls := c.closed
oldActive := c.active
if cls {
shouldUpdateRoutes = false
} else {
// Close the client instances that are no longer part of roster.
for ep, clicfg := range oldActive {
_, isPartOfUpdatedEndpointsConfig := newEndpoints[ep]
if !isPartOfUpdatedEndpointsConfig {
c.debugLog("Found updated endpoint configs, will close inactive endpoint client : %s", ep.host)
toClose = append(toClose, clicfg)
}
}
// Create client instances for the new endpoints in roster.
for i, ep := range config {
cliAndCfg, alreadyExists := oldActive[ep.hostPort()]
if !alreadyExists {
cli, err := c.newSingleClient(ep)
if err != nil {
shouldUpdateRoutes = false
break
} else {
cliAndCfg = clientAndConfig{client: cli, cfg: ep}
newCliCfg = append(newCliCfg, cliAndCfg)
}
if singleCli, ok := cli.(HealthCheckDaxAPI); ok {
singleCli.startHealthChecks(c, ep.hostPort())
}
}
newActive[ep.hostPort()] = cliAndCfg
newRoutes[i] = cliAndCfg.client
}
}
if shouldUpdateRoutes {
c.active = newActive
c.routeManager.setRoutes(newRoutes)
} else {
// cleanup newly created clients if they are not going to be tracked further.
toClose = append(toClose, newCliCfg...)
}
c.lock.Unlock()
go func() {
for _, client := range toClose {
c.debugLog("Closing client for : %s", client.cfg.hostname)
c.closeClient(client.client)
}
}()
return nil
}