in build-index/tagserver/server.go [556:601]
func (s *Server) replicateTag(ctx context.Context, tag string, d core.Digest, deps core.DigestList) error {
destinations := s.remotes.Match(tag)
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "destination_count", len(destinations)).Debug("Checking remote destinations for tag replication")
if len(destinations) == 0 {
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String()).Debug("No remote destinations configured for tag")
return nil
}
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "destinations", destinations).Info("Adding remote replication tasks")
for _, dest := range destinations {
task := tagreplication.NewTask(tag, d, deps, dest, 0)
if err := s.tagReplicationManager.Add(task); err != nil {
return fmt.Errorf("add replicate task: %w", err)
}
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "destination", dest).Debug("Added remote replication task")
}
neighbors := s.neighbors.Resolve()
neighborCount := len(neighbors)
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "neighbor_count", neighborCount).Debug("Notifying neighbors about remote replication")
var delay time.Duration
var successes int
for addr := range neighbors { // Loops in random order.
delay += s.config.DuplicateReplicateStagger
client := s.provider.Provide(addr)
if err := client.DuplicateReplicate(tag, d, deps, delay); err != nil {
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "neighbor", addr, "delay", delay).Errorf("Failed to notify neighbor about replication: %s", err)
} else {
successes++
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "neighbor", addr, "delay", delay).Debug("Successfully notified neighbor about replication")
}
}
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "remote_destinations", len(destinations), "notified_neighbors", successes, "failed_neighbors", neighborCount-successes).Info("Completed remote replication setup")
if len(neighbors) != 0 && successes == 0 {
s.stats.Counter("duplicate_replicate_failures").Inc(1)
log.WithTraceContext(ctx).With("tag", tag, "digest", d.String(), "neighbor_count", neighborCount).Error("All neighbor replication notifications failed")
}
return nil
}