in modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java [590:921]
private void onDiscovery0(DiscoveryNotification notification) {
int type = notification.type();
ClusterNode node = notification.getNode();
long topVer = notification.getTopVer();
DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null
: ((CustomMessageWrapper)notification.getCustomMsgData()).delegate();
if (skipMessage(notification.type(), customMsg))
return;
final ClusterNode locNode = localNode();
if (notification.getTopHist() != null)
topHist = Collections.unmodifiableNavigableMap(notification.getTopHist());
boolean verChanged;
if (type == EVT_NODE_METRICS_UPDATED)
verChanged = false;
else {
if (type != EVT_NODE_SEGMENTED &&
type != EVT_CLIENT_NODE_DISCONNECTED &&
type != EVT_CLIENT_NODE_RECONNECTED &&
type != EVT_DISCOVERY_CUSTOM_EVT) {
minorTopVer = 0;
verChanged = true;
}
else
verChanged = false;
}
if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) {
for (DiscoCache c : discoCacheHist.values())
c.updateAlives(node);
updateClientNodes(node.id());
}
boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id());
ChangeGlobalStateFinishMessage stateFinishMsg = null;
if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)
stateFinishMsg = ctx.state().onNodeLeft(node);
final AffinityTopologyVersion nextTopVer;
if (type == EVT_DISCOVERY_CUSTOM_EVT) {
assert customMsg != null;
boolean incMinorTopVer;
if (customMsg instanceof ChangeGlobalStateMessage) {
incMinorTopVer = ctx.state().onStateChangeMessage(
new AffinityTopologyVersion(topVer, minorTopVer),
(ChangeGlobalStateMessage)customMsg,
discoCache());
}
else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
Snapshot snapshot = topSnap.get();
// Topology version does not change, but need create DiscoCache with new state.
DiscoCache discoCache = snapshot.discoCache.copy(snapshot.topVer, ctx.state().clusterState());
topSnap.set(new Snapshot(snapshot.topVer, discoCache));
incMinorTopVer = false;
}
else {
incMinorTopVer = ctx.cache().onCustomEvent(
customMsg,
new AffinityTopologyVersion(topVer, minorTopVer),
node);
}
if (incMinorTopVer) {
minorTopVer++;
verChanged = true;
}
nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
if (incMinorTopVer)
ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState());
}
else {
nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState());
}
DiscoCache discoCache;
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
Snapshot snapshot = topSnap.get();
if (customMsg == null) {
discoCache = createDiscoCache(
nextTopVer,
ctx.state().clusterState(),
locNode,
notification.getTopSnapshot());
}
else if (customMsg instanceof ChangeGlobalStateMessage) {
discoCache = createDiscoCache(
nextTopVer,
ctx.state().pendingState((ChangeGlobalStateMessage)customMsg),
locNode,
notification.getTopSnapshot());
}
else
discoCache = customMsg.createDiscoCache(GridDiscoveryManager.this, nextTopVer, snapshot.discoCache);
discoCacheHist.put(nextTopVer, discoCache);
assert snapshot.topVer.compareTo(nextTopVer) < 0 : "Topology version out of order [this.topVer=" +
topSnap + ", topVer=" + topVer + ", node=" + node + ", nextTopVer=" + nextTopVer +
", evt=" + U.gridEventName(type) + ']';
topSnap.set(new Snapshot(nextTopVer, discoCache));
}
else
// Current version.
discoCache = discoCache();
if (locJoinEvt || !node.isClient()) {
if (type == EVT_NODE_LEFT || type == EVT_NODE_FAILED || type == EVT_NODE_JOINED) {
boolean discoCacheRecalculationRequired = ctx.state().autoAdjustInMemoryClusterState(
node.id(),
notification.getTopSnapshot(),
discoCache,
topVer,
minorTopVer
);
if (discoCacheRecalculationRequired) {
discoCache = createDiscoCache(
nextTopVer,
ctx.state().clusterState(),
locNode,
notification.getTopSnapshot()
);
discoCacheHist.put(nextTopVer, discoCache);
topSnap.set(new Snapshot(nextTopVer, discoCache));
}
}
}
if (type == EVT_DISCOVERY_CUSTOM_EVT) {
for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
if (list != null) {
for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
try {
lsnr.onCustomEvent(nextTopVer, node, customMsg);
}
catch (Exception e) {
U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
}
}
}
}
}
SecurityContext secCtx = remoteSecurityContext(ctx);
// If this is a local join event, just save it and do not notify listeners.
if (locJoinEvt) {
if (gridStartTime == 0)
gridStartTime = getSpi().getGridStartTime();
topSnap.set(new Snapshot(nextTopVer, discoCache));
startLatch.countDown();
DiscoveryEvent discoEvt = new DiscoveryEvent();
discoEvt.node(ctx.discovery().localNode());
discoEvt.eventNode(node);
discoEvt.type(EVT_NODE_JOINED);
discoEvt.topologySnapshot(topVer, new ArrayList<>(notification.getTopSnapshot()));
if (notification.getSpanContainer() != null)
discoEvt.span(notification.getSpanContainer().span());
discoWrk.discoCache = discoCache;
if (!ctx.clientDisconnected()) {
// The security processor must be notified first, since {@link IgniteSecurity#onLocalJoin}
// finishes local node security context initialization that can be demanded by other Ignite
// components.
ctx.security().onLocalJoin();
ctx.cache().context().versions().onLocalJoin(topVer);
ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
ctx.service().onLocalJoin(discoEvt, discoCache);
ctx.encryption().onLocalJoin();
ctx.cluster().onLocalJoin();
}
IgniteInternalFuture<Boolean> transitionWaitFut = ctx.state().onLocalJoin(discoCache);
locJoin.onDone(new DiscoveryLocalJoinData(discoEvt,
discoCache,
transitionWaitFut,
ctx.state().clusterState().state().active()));
return;
}
else if (type == EVT_CLIENT_NODE_DISCONNECTED) {
/*
* Notify all components from discovery thread to avoid concurrent
* reconnect while disconnect handling is in progress.
*/
assert locNode.isClient() : locNode;
assert node.isClient() : node;
((IgniteKernal)ctx.grid()).onDisconnected();
if (!locJoin.isDone())
locJoin.onDone(new IgniteCheckedException("Node disconnected"));
locJoin = new GridFutureAdapter<>();
registeredCaches.clear();
registeredCacheGrps.clear();
for (AffinityTopologyVersion histVer : discoCacheHist.keySet()) {
Object rmvd = discoCacheHist.remove(histVer);
assert rmvd != null : histVer;
}
topHist = Collections.emptyNavigableMap();
topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode,
Collections.singleton(locNode))
));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
assert node.isClient() : node;
ctx.security().onLocalJoin();
boolean clusterRestarted = gridStartTime != getSpi().getGridStartTime();
gridStartTime = getSpi().getGridStartTime();
((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
ctx.service().onLocalJoin(localJoinEvent(), discoCache);
DiscoCache discoCache0 = discoCache;
ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
@Override public void apply(IgniteFuture<?> fut) {
try {
fut.get();
discoWrk.addEvent(
new NotificationEvent(
EVT_CLIENT_NODE_RECONNECTED,
nextTopVer,
node,
discoCache0,
notification.getTopSnapshot(),
null,
notification.getSpanContainer(),
secCtx
)
);
}
catch (IgniteException ignore) {
// No-op.
}
}
});
return;
}
if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
discoWrk.addEvent(
new NotificationEvent(
type,
nextTopVer,
node, discoCache,
notification.getTopSnapshot(),
customMsg,
notification.getSpanContainer(),
secCtx
)
);
if (stateFinishMsg != null)
discoWrk.addEvent(
new NotificationEvent(
EVT_DISCOVERY_CUSTOM_EVT,
nextTopVer,
node,
discoCache,
notification.getTopSnapshot(),
stateFinishMsg,
notification.getSpanContainer(),
secCtx
)
);
if (type == EVT_CLIENT_NODE_DISCONNECTED)
discoWrk.awaitDisconnectEvent();
}