in management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinator.scala [213:313]
def bootstrapping(replyTo: ActorRef, selfContactPointScheme: String): Receive = {
case DiscoverTick =>
// the next round of discovery will be performed once this one returns
discoverContactPoints()
case ServiceDiscovery.Resolved(_, contactPoints) =>
val filteredContactPoints: Iterable[ResolvedTarget] = selectHosts(
lookup,
settings.contactPoint.fallbackPort,
settings.contactPoint.filterOnFallbackPort,
contactPoints)
log.info(
BootstrapLogMarker.resolved(formatContactPoints(filteredContactPoints)),
"Located service members based on: [{}]: [{}], filtered to [{}]",
lookup,
contactPoints.mkString(", "),
formatContactPoints(filteredContactPoints).mkString(", "))
onContactPointsResolved(filteredContactPoints, selfContactPointScheme)
resetDiscoveryInterval() // in case we were backed-off, we reset back to healthy intervals
startSingleDiscoveryTimer() // keep looking in case other nodes join the discovery
case ex: Failure =>
log.warning(BootstrapLogMarker.resolveFailed, "Resolve attempt failed! Cause: {}", ex.cause)
// prevent join decision until successful discoverContactPoints
lastContactsObservation = None
backoffDiscoveryInterval()
startSingleDiscoveryTimer()
case ObtainedHttpSeedNodesObservation(observedAt, contactPoint, infoFromAddress, observedSeedNodes) =>
lastContactsObservation.foreach { contacts =>
if (contacts.observedContactPoints.contains(contactPoint)) {
log.info(
BootstrapLogMarker.seedNodes(observedSeedNodes),
"Contact point [{}] returned [{}] seed-nodes [{}]",
infoFromAddress,
observedSeedNodes.size,
observedSeedNodes.mkString(", "))
seedNodesObservations = seedNodesObservations.updated(
contactPoint,
new SeedNodesObservation(observedAt, contactPoint, infoFromAddress, observedSeedNodes))
}
// if we got seed nodes it is likely that it should join those immediately
if (observedSeedNodes.nonEmpty)
decide()
}
case DecideTick =>
decide()
case d: JoinDecision =>
decisionInProgress = false
d match {
case KeepProbing => // continue scheduled lookups and probing of discovered contact points
case JoinOtherSeedNodes(seedNodes) =>
if (seedNodes.nonEmpty) {
log.info(
BootstrapLogMarker.join(seedNodes),
"Joining [{}] to existing cluster [{}]",
cluster.selfAddress,
seedNodes.mkString(", "))
val seedNodesList = (seedNodes - cluster.selfAddress).toList // order doesn't matter
cluster.joinSeedNodes(seedNodesList)
// once we issued a join bootstrapping is completed
context.stop(self)
}
case JoinSelf =>
log.info(
BootstrapLogMarker.joinSelf,
"Initiating new cluster, self-joining [{}]. " +
"Other nodes are expected to locate this cluster via continued contact-point probing.",
cluster.selfAddress)
cluster.join(cluster.selfAddress)
// once we issued a join bootstrapping is completed
context.stop(self)
}
case ProbingFailed(contactPoint, _) =>
lastContactsObservation.foreach { contacts =>
if (contacts.observedContactPoints.contains(contactPoint)) {
log.info(
BootstrapLogMarker.seedNodesProbingFailed(formatContactPoints(contacts.observedContactPoints)),
"Received signal that probing has failed, scheduling contact point probing again")
// child actor will have terminated now, so we ride on another discovery round to cause looking up
// target nodes and if the same still exists, that would cause probing it again
//
// we do this in order to not keep probing nodes which simply have been removed from the deployment
}
}
// remove the previous observation since it might be obsolete
seedNodesObservations -= contactPoint
startSingleDiscoveryTimer()
}