in cluster/cluster.go [131:255]
func Create(
l log.Logger,
reg prometheus.Registerer,
bindAddr string,
advertiseAddr string,
knownPeers []string,
waitIfEmpty bool,
pushPullInterval time.Duration,
gossipInterval time.Duration,
tcpTimeout time.Duration,
probeTimeout time.Duration,
probeInterval time.Duration,
tlsTransportConfig *TLSTransportConfig,
allowInsecureAdvertise bool,
label string,
) (*Peer, error) {
bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
if err != nil {
return nil, fmt.Errorf("invalid listen address: %w", err)
}
bindPort, err := strconv.Atoi(bindPortStr)
if err != nil {
return nil, fmt.Errorf("address %s: invalid port: %w", bindAddr, err)
}
var advertiseHost string
var advertisePort int
if advertiseAddr != "" {
var advertisePortStr string
advertiseHost, advertisePortStr, err = net.SplitHostPort(advertiseAddr)
if err != nil {
return nil, fmt.Errorf("invalid advertise address: %w", err)
}
advertisePort, err = strconv.Atoi(advertisePortStr)
if err != nil {
return nil, fmt.Errorf("address %s: invalid port: %w", advertiseAddr, err)
}
}
resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, &net.Resolver{}, waitIfEmpty)
if err != nil {
return nil, fmt.Errorf("resolve peers: %w", err)
}
level.Debug(l).Log("msg", "resolved peers to following addresses", "peers", strings.Join(resolvedPeers, ","))
// Initial validation of user-specified advertise address.
addr, err := calculateAdvertiseAddress(bindHost, advertiseHost, allowInsecureAdvertise)
if err != nil {
level.Warn(l).Log("err", "couldn't deduce an advertise address: "+err.Error())
} else if hasNonlocal(resolvedPeers) && isUnroutable(addr.String()) {
level.Warn(l).Log("err", "this node advertises itself on an unroutable address", "addr", addr.String())
level.Warn(l).Log("err", "this node will be unreachable in the cluster")
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
} else if isAny(bindAddr) && advertiseHost == "" {
// memberlist doesn't advertise properly when the bind address is empty or unspecified.
level.Info(l).Log("msg", "setting advertise address explicitly", "addr", addr.String(), "port", bindPort)
advertiseHost = addr.String()
advertisePort = bindPort
}
// TODO(fabxc): generate human-readable but random names?
name, err := ulid.New(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))
if err != nil {
return nil, err
}
p := &Peer{
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
resolvedPeers: resolvedPeers,
knownPeers: knownPeers,
}
p.register(reg, name.String())
retransmit := len(knownPeers) / 2
if retransmit < 3 {
retransmit = 3
}
p.delegate = newDelegate(l, reg, p, retransmit)
cfg := memberlist.DefaultLANConfig()
cfg.Name = name.String()
cfg.BindAddr = bindHost
cfg.BindPort = bindPort
cfg.Delegate = p.delegate
cfg.Ping = p.delegate
cfg.Alive = p.delegate
cfg.Events = p.delegate
cfg.GossipInterval = gossipInterval
cfg.PushPullInterval = pushPullInterval
cfg.TCPTimeout = tcpTimeout
cfg.ProbeTimeout = probeTimeout
cfg.ProbeInterval = probeInterval
cfg.LogOutput = &logWriter{l: l}
cfg.GossipNodes = retransmit
cfg.UDPBufferSize = MaxGossipPacketSize
cfg.Label = label
if advertiseHost != "" {
cfg.AdvertiseAddr = advertiseHost
cfg.AdvertisePort = advertisePort
p.setInitialFailed(resolvedPeers, fmt.Sprintf("%s:%d", advertiseHost, advertisePort))
} else {
p.setInitialFailed(resolvedPeers, bindAddr)
}
if tlsTransportConfig != nil {
level.Info(l).Log("msg", "using TLS for gossip")
cfg.Transport, err = NewTLSTransport(context.Background(), l, reg, cfg.BindAddr, cfg.BindPort, tlsTransportConfig)
if err != nil {
return nil, fmt.Errorf("tls transport: %w", err)
}
}
ml, err := memberlist.Create(cfg)
if err != nil {
return nil, fmt.Errorf("create memberlist: %w", err)
}
p.mlist = ml
return p, nil
}