func()

in dax/internal/client/cluster.go [570:638]


func (c *cluster) update(config []serviceEndpoint) error {
	newEndpoints := make(map[hostPort]struct{}, len(config))
	for _, cfg := range config {
		newEndpoints[cfg.hostPort()] = struct{}{}
	}

	newActive := make(map[hostPort]clientAndConfig, len(config))
	newRoutes := make([]DaxAPI, len(config))
	shouldUpdateRoutes := true
	var toClose []clientAndConfig
	// Track the newly created client instances, so that we can clean them up in case of partial failures.
	var newCliCfg []clientAndConfig

	c.lock.Lock()

	cls := c.closed
	oldActive := c.active

	if cls {
		shouldUpdateRoutes = false
	} else {
		// Close the client instances that are no longer part of roster.
		for ep, clicfg := range oldActive {
			_, isPartOfUpdatedEndpointsConfig := newEndpoints[ep]
			if !isPartOfUpdatedEndpointsConfig {
				c.debugLog("Found updated endpoint configs, will close inactive endpoint client : %s", ep.host)
				toClose = append(toClose, clicfg)
			}
		}

		// Create client instances for the new endpoints in roster.
		for i, ep := range config {
			cliAndCfg, alreadyExists := oldActive[ep.hostPort()]
			if !alreadyExists {
				cli, err := c.newSingleClient(ep)
				if err != nil {
					shouldUpdateRoutes = false
					break
				} else {
					cliAndCfg = clientAndConfig{client: cli, cfg: ep}
					newCliCfg = append(newCliCfg, cliAndCfg)
				}

				if singleCli, ok := cli.(HealthCheckDaxAPI); ok {
					singleCli.startHealthChecks(c, ep.hostPort())
				}
			}
			newActive[ep.hostPort()] = cliAndCfg
			newRoutes[i] = cliAndCfg.client
		}
	}

	if shouldUpdateRoutes {
		c.active = newActive
		c.routeManager.setRoutes(newRoutes)
	} else {
		// cleanup newly created clients if they are not going to be tracked further.
		toClose = append(toClose, newCliCfg...)
	}
	c.lock.Unlock()

	go func() {
		for _, client := range toClose {
			c.debugLog("Closing client for : %s", client.cfg.hostname)
			c.closeClient(client.client)
		}
	}()
	return nil
}