in management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/LowestAddressJoinDecider.scala [39:114]
override def decide(info: SeedNodesInformation): Future[JoinDecision] =
if (info.hasSeedNodes) {
val seeds = joinOtherSeedNodes(info)
if (seeds.isEmpty) KeepProbing.asCompletedFuture else JoinOtherSeedNodes(seeds).asCompletedFuture
} else if (!hasEnoughContactPoints(info)) {
log.info(
BootstrapLogMarker.inProgress(info.contactPoints.map(contactPointString), info.allSeedNodes),
"Discovered [{}] contact points, confirmed [{}], which is less than the required [{}], retrying",
info.contactPoints.size,
info.seedNodesObservations.size,
settings.contactPointDiscovery.requiredContactPointsNr)
KeepProbing.asCompletedFuture
} else if (!isPastStableMargin(info)) {
log.debug(
BootstrapLogMarker.inProgress(info.contactPoints.map(contactPointString), info.allSeedNodes),
"Contact points observations have changed more recently than the stable-margin [{}], changed at [{}], " +
"not joining myself. This process will be retried.",
settings.contactPointDiscovery.stableMargin,
info.contactPointsChangedAt)
KeepProbing.asCompletedFuture
} else {
// no seed nodes
val contactPointsWithoutSeedNodesObservations =
if (isConfirmedCommunicationWithAllContactPointsRequired(info))
info.contactPoints -- info.seedNodesObservations.map(_.contactPoint)
else
Set.empty[ResolvedTarget]
if (contactPointsWithoutSeedNodesObservations.isEmpty) {
// got info from all contact points as expected
val lowestAddress = lowestAddressContactPoint(info)
// can the lowest address, if exists, join self
val isJoinSelfAble = lowestAddress.exists(canJoinSelf(_, info))
if (isJoinSelfAble && settings.newClusterEnabled)
JoinSelf.asCompletedFuture
else {
if (settings.newClusterEnabled) {
if (log.isInfoEnabled)
log.info(
BootstrapLogMarker.inProgress(info.contactPoints.map(contactPointString), info.allSeedNodes),
"Exceeded stable margins without locating seed-nodes, however this node {} is NOT the lowest address " +
"out of the discovered endpoints in this deployment, thus NOT joining self. Expecting node [{}] " +
"(out of [{}]) to perform the self-join and initiate the cluster.",
contactPointString(selfContactPoint),
lowestAddress.map(contactPointString).getOrElse(""),
info.contactPoints.map(contactPointString).mkString(", "))
} else {
if (log.isWarningEnabled)
log.warning(
BootstrapLogMarker.inProgress(info.contactPoints.map(contactPointString), info.allSeedNodes),
"Exceeded stable margins without locating seed-nodes, however this node {} is configured with " +
"new-cluster-enabled=off, thus NOT joining self. Expecting existing cluster or node [{}] " +
"(out of [{}]) to perform the self-join and initiate the cluster.",
contactPointString(selfContactPoint),
lowestAddress.map(contactPointString).getOrElse(""),
info.contactPoints.map(contactPointString).mkString(", "))
}
// the probing will continue until the lowest addressed node decides to join itself.
// note, that due to DNS changes this may still become this node! We'll then await until the dns stableMargin
// is exceeded and would decide to try joining self again (same code-path), that time successfully though.
KeepProbing.asCompletedFuture
}
} else {
// missing info from some contact points (e.g. because of probe failing)
if (log.isInfoEnabled)
log.info(
BootstrapLogMarker.inProgress(info.contactPoints.map(contactPointString), info.allSeedNodes),
"Exceeded stable margins but missing seed node information from some contact points [{}] (out of [{}])",
contactPointsWithoutSeedNodesObservations.map(contactPointString).mkString(", "),
info.contactPoints.map(contactPointString).mkString(", "))
KeepProbing.asCompletedFuture
}
}