in registry/datastore/db.go [976:1084]
func (lb *DBLoadBalancer) ResolveReplicas(ctx context.Context) error {
lb.replicaMutex.Lock()
defer lb.replicaMutex.Unlock()
ctx, cancel := context.WithTimeout(ctx, ReplicaResolveTimeout)
defer cancel()
var result *multierror.Error
l := lb.logger(ctx)
// Resolve replica DSNs
var resolvedDSNs []DSN
if lb.resolver != nil {
l.Info("resolving replicas with service discovery")
addrs, err := resolveHosts(ctx, lb.resolver)
if err != nil {
return fmt.Errorf("failed to resolve replica hosts: %w", err)
}
for _, addr := range addrs {
dsn := *lb.primaryDSN
dsn.Host = addr.IP.String()
dsn.Port = addr.Port
resolvedDSNs = append(resolvedDSNs, dsn)
}
} else if len(lb.fixedHosts) > 0 {
l.Info("resolving replicas with fixed hosts list")
for _, host := range lb.fixedHosts {
dsn := *lb.primaryDSN
dsn.Host = host
resolvedDSNs = append(resolvedDSNs, dsn)
}
}
// Open connections for _added_ replicas
var outputReplicas []*DB
var added, removed []string
for i := range resolvedDSNs {
var err error
dsn := &resolvedDSNs[i]
l = l.WithFields(logrus.Fields{"db_replica_addr": dsn.Address()})
r := dbByAddress(lb.replicas, dsn.Address())
if r != nil {
// check if connection to existing replica is still usable
if err := r.PingContext(ctx); err != nil {
l.WithError(err).Warn("replica is known but connection is stale, attempting to reconnect")
r, err = lb.connector.Open(ctx, dsn, lb.replicaOpenOpts...)
if err != nil {
result = multierror.Append(result, fmt.Errorf("reopening replica %q database connection: %w", dsn.Address(), err))
continue
}
} else {
l.Info("replica is known and healthy, reusing connection")
}
} else {
l.Info("replica is new, opening connection")
if r, err = lb.connector.Open(ctx, dsn, lb.replicaOpenOpts...); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to open replica %q database connection: %w", dsn.Address(), err))
continue
}
added = append(added, r.Address())
metrics.ReplicaAdded()
// Register metrics collector for the added replica
if lb.metricsEnabled {
collector := lb.metricsCollector(r, HostTypeReplica)
// Unlike the primary host metrics collector, replica collectors wil be registered in the background
// whenever the pool changes. We don't want to cause a panic here, so we'll rely on prometheus.Register
// instead of prometheus.MustRegister and gracefully handle an error by logging and reporting it.
if err := lb.promRegisterer.Register(collector); err != nil {
l.WithError(err).WithFields(log.Fields{"db_replica_addr": r.Address()}).
Error("failed to register collector for database replica metrics")
errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace())
}
lb.replicaPromCollectors[r.Address()] = collector
}
}
r.errorProcessor = lb
outputReplicas = append(outputReplicas, r)
}
// Identify removed replicas
for _, r := range lb.replicas {
if dbByAddress(outputReplicas, r.Address()) == nil {
removed = append(removed, r.Address())
metrics.ReplicaRemoved()
// Unregister the metrics collector for the removed replica
lb.unregisterReplicaMetricsCollector(r)
// Close handlers for retired replicas
l.WithFields(log.Fields{"db_replica_addr": r.Address()}).Info("closing connection handler for retired replica")
if err := r.Close(); err != nil {
err = fmt.Errorf("failed to close retired replica %q connection: %w", r.Address(), err)
result = multierror.Append(result, err)
errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace())
}
}
}
l.WithFields(logrus.Fields{
"added_hosts": strings.Join(added, ","),
"removed_hosts": strings.Join(removed, ","),
}).Info("updating replicas list")
metrics.ReplicaPoolSize(len(outputReplicas))
lb.replicas = outputReplicas
return result.ErrorOrNil()
}