func()

in registry/datastore/db.go [976:1084]


func (lb *DBLoadBalancer) ResolveReplicas(ctx context.Context) error {
	lb.replicaMutex.Lock()
	defer lb.replicaMutex.Unlock()

	ctx, cancel := context.WithTimeout(ctx, ReplicaResolveTimeout)
	defer cancel()

	var result *multierror.Error
	l := lb.logger(ctx)

	// Resolve replica DSNs
	var resolvedDSNs []DSN
	if lb.resolver != nil {
		l.Info("resolving replicas with service discovery")
		addrs, err := resolveHosts(ctx, lb.resolver)
		if err != nil {
			return fmt.Errorf("failed to resolve replica hosts: %w", err)
		}
		for _, addr := range addrs {
			dsn := *lb.primaryDSN
			dsn.Host = addr.IP.String()
			dsn.Port = addr.Port
			resolvedDSNs = append(resolvedDSNs, dsn)
		}
	} else if len(lb.fixedHosts) > 0 {
		l.Info("resolving replicas with fixed hosts list")
		for _, host := range lb.fixedHosts {
			dsn := *lb.primaryDSN
			dsn.Host = host
			resolvedDSNs = append(resolvedDSNs, dsn)
		}
	}

	// Open connections for _added_ replicas
	var outputReplicas []*DB
	var added, removed []string
	for i := range resolvedDSNs {
		var err error
		dsn := &resolvedDSNs[i]
		l = l.WithFields(logrus.Fields{"db_replica_addr": dsn.Address()})

		r := dbByAddress(lb.replicas, dsn.Address())
		if r != nil {
			// check if connection to existing replica is still usable
			if err := r.PingContext(ctx); err != nil {
				l.WithError(err).Warn("replica is known but connection is stale, attempting to reconnect")
				r, err = lb.connector.Open(ctx, dsn, lb.replicaOpenOpts...)
				if err != nil {
					result = multierror.Append(result, fmt.Errorf("reopening replica %q database connection: %w", dsn.Address(), err))
					continue
				}
			} else {
				l.Info("replica is known and healthy, reusing connection")
			}
		} else {
			l.Info("replica is new, opening connection")
			if r, err = lb.connector.Open(ctx, dsn, lb.replicaOpenOpts...); err != nil {
				result = multierror.Append(result, fmt.Errorf("failed to open replica %q database connection: %w", dsn.Address(), err))
				continue
			}
			added = append(added, r.Address())
			metrics.ReplicaAdded()

			// Register metrics collector for the added replica
			if lb.metricsEnabled {
				collector := lb.metricsCollector(r, HostTypeReplica)
				// Unlike the primary host metrics collector, replica collectors wil be registered in the background
				// whenever the pool changes. We don't want to cause a panic here, so we'll rely on prometheus.Register
				// instead of prometheus.MustRegister and gracefully handle an error by logging and reporting it.
				if err := lb.promRegisterer.Register(collector); err != nil {
					l.WithError(err).WithFields(log.Fields{"db_replica_addr": r.Address()}).
						Error("failed to register collector for database replica metrics")
					errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace())
				}
				lb.replicaPromCollectors[r.Address()] = collector
			}
		}
		r.errorProcessor = lb
		outputReplicas = append(outputReplicas, r)
	}

	// Identify removed replicas
	for _, r := range lb.replicas {
		if dbByAddress(outputReplicas, r.Address()) == nil {
			removed = append(removed, r.Address())
			metrics.ReplicaRemoved()

			// Unregister the metrics collector for the removed replica
			lb.unregisterReplicaMetricsCollector(r)

			// Close handlers for retired replicas
			l.WithFields(log.Fields{"db_replica_addr": r.Address()}).Info("closing connection handler for retired replica")
			if err := r.Close(); err != nil {
				err = fmt.Errorf("failed to close retired replica %q connection: %w", r.Address(), err)
				result = multierror.Append(result, err)
				errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace())
			}
		}
	}

	l.WithFields(logrus.Fields{
		"added_hosts":   strings.Join(added, ","),
		"removed_hosts": strings.Join(removed, ","),
	}).Info("updating replicas list")
	metrics.ReplicaPoolSize(len(outputReplicas))
	lb.replicas = outputReplicas

	return result.ErrorOrNil()
}