in modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java [79:223]
public ClusterService createClusterService(
String consistentId,
NetworkConfiguration networkConfiguration,
NettyBootstrapFactory nettyBootstrapFactory,
MessageSerializationRegistry serializationRegistry,
StaleIds staleIds
) {
var messageFactory = new NetworkMessagesFactory();
var topologyService = new ScaleCubeTopologyService();
UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();
var messagingService = new DefaultMessagingService(
consistentId,
messageFactory,
topologyService,
userObjectSerialization.descriptorRegistry(),
userObjectSerialization.marshaller()
);
return new AbstractClusterService(consistentId, topologyService, messagingService, serializationRegistry) {
private volatile ClusterImpl cluster;
private volatile ConnectionManager connectionMgr;
private volatile CompletableFuture<Void> shutdownFuture;
/** {@inheritDoc} */
@Override
public void start() {
var serializationService = new SerializationService(serializationRegistry, userObjectSerialization);
UUID launchId = UUID.randomUUID();
NetworkView configView = networkConfiguration.value();
connectionMgr = new ConnectionManager(
configView,
serializationService,
launchId,
consistentId,
nettyBootstrapFactory,
staleIds
);
connectionMgr.start();
var transport = new ScaleCubeDirectMarshallerTransport(
connectionMgr.localAddress(),
messagingService,
topologyService,
messageFactory
);
NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
cluster = new ClusterImpl(clusterConfig(configView.membership()))
.handler(cl -> new ClusterMessageHandler() {
/** {@inheritDoc} */
@Override
public void onMembershipEvent(MembershipEvent event) {
topologyService.onMembershipEvent(event);
}
})
.config(opts -> opts
.memberId(launchId.toString())
.memberAlias(consistentId)
.metadataCodec(METADATA_CODEC)
)
.transport(opts -> opts.transportFactory(transportConfig -> transport))
.membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
shutdownFuture = cluster.onShutdown().toFuture();
// resolve cyclic dependencies
topologyService.setCluster(cluster);
messagingService.setConnectionManager(connectionMgr);
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
staleIds.markAsStale(member.id());
}
});
cluster.startAwait();
// emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
var localMembershipEvent = createAdded(cluster.member(), null, System.currentTimeMillis());
topologyService.onMembershipEvent(localMembershipEvent);
}
/** {@inheritDoc} */
@Override
public void stop() {
// local member will be null, if cluster has not been started
if (cluster == null || cluster.member() == null) {
return;
}
connectionMgr.initiateStopping();
cluster.shutdown();
try {
shutdownFuture.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
} catch (ExecutionException e) {
throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
} catch (TimeoutException e) {
// Failed to leave gracefully
LOG.warn("Failed to wait for ScaleCube cluster shutdown [reason={}]", e, e.getMessage());
}
connectionMgr.stop();
// Messaging service checks connection manager's status before sending a message, so connection manager should be
// stopped before messaging service
messagingService.stop();
}
/** {@inheritDoc} */
@Override
public void beforeNodeStop() {
stop();
}
/** {@inheritDoc} */
@Override
public boolean isStopped() {
return shutdownFuture.isDone();
}
@Override
public void updateMetadata(NodeMetadata metadata) {
cluster.updateMetadata(metadata).subscribe();
topologyService.updateLocalMetadata(metadata);
}
};
}