in modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java [101:295]
public ClusterService createClusterService(
String consistentId,
NetworkConfiguration networkConfiguration,
NettyBootstrapFactory nettyBootstrapFactory,
MessageSerializationRegistry serializationRegistry,
StaleIds staleIds,
ClusterIdSupplier clusterIdSupplier,
CriticalWorkerRegistry criticalWorkerRegistry,
FailureProcessor failureProcessor,
ChannelTypeRegistry channelTypeRegistry,
IgniteProductVersionSource productVersionSource
) {
var topologyService = new ScaleCubeTopologyService();
// Adding this handler as the first handler to make sure that StaleIds is at least up-to-date as any
// other component that watches topology events.
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
staleIds.markAsStale(member.id());
}
});
var messageFactory = new NetworkMessagesFactory();
UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();
var messagingService = new DefaultMessagingService(
consistentId,
messageFactory,
topologyService,
staleIds,
userObjectSerialization.descriptorRegistry(),
userObjectSerialization.marshaller(),
criticalWorkerRegistry,
failureProcessor,
channelTypeRegistry
);
return new AbstractClusterService(consistentId, topologyService, messagingService, serializationRegistry) {
private volatile ClusterImpl cluster;
private volatile ConnectionManager connectionMgr;
private volatile CompletableFuture<Void> shutdownFuture;
@Override
public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
var serializationService = new SerializationService(serializationRegistry, userObjectSerialization);
UUID launchId = UUID.randomUUID();
NetworkView configView = networkConfiguration.value();
ConnectionManager connectionMgr = new ConnectionManager(
configView,
serializationService,
consistentId,
launchId,
nettyBootstrapFactory,
staleIds,
clusterIdSupplier,
channelTypeRegistry,
productVersionSource
);
this.connectionMgr = connectionMgr;
connectionMgr.start();
messagingService.start();
Address scalecubeLocalAddress = prepareAddress(connectionMgr.localAddress());
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
connectionMgr.handleNodeLeft(member.id());
}
});
var transport = new ScaleCubeDirectMarshallerTransport(
scalecubeLocalAddress,
messagingService,
topologyService,
messageFactory
);
ClusterConfig clusterConfig = clusterConfig(configView.membership());
NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder(), nodeName(), connectionMgr.localAddress());
finder.start();
ClusterImpl cluster = new ClusterImpl(clusterConfig)
.handler(cl -> new ClusterMessageHandler() {
@Override
public void onMembershipEvent(MembershipEvent event) {
topologyService.onMembershipEvent(event);
}
})
.config(opts -> opts
.memberId(launchId.toString())
.memberAlias(consistentId)
.metadataCodec(METADATA_CODEC)
)
.transport(opts -> opts.transportFactory(transportConfig -> transport))
.membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
Member localMember = createLocalMember(scalecubeLocalAddress, launchId, clusterConfig);
ClusterNode localNode = new ClusterNodeImpl(
UUID.fromString(localMember.id()),
consistentId,
new NetworkAddress(localMember.address().host(), localMember.address().port())
);
connectionMgr.setLocalNode(localNode);
this.shutdownFuture = cluster.onShutdown().toFuture()
.thenAccept(v -> finder.close());
// resolve cyclic dependencies
topologyService.setCluster(cluster);
messagingService.setConnectionManager(connectionMgr);
cluster.startAwait();
assert cluster.member().equals(localMember) : "Expected local member from cluster " + cluster.member()
+ " to be equal to the precomputed one " + localMember;
// emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
var localMembershipEvent = createAdded(cluster.member(), null, System.currentTimeMillis());
topologyService.onMembershipEvent(localMembershipEvent);
this.cluster = cluster;
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
try {
ConnectionManager localConnectionMgr = connectionMgr;
if (localConnectionMgr != null) {
localConnectionMgr.initiateStopping();
}
// Local member will be null, if cluster has not been started.
if (cluster != null && cluster.member() != null) {
cluster.shutdown();
try {
shutdownFuture.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
} catch (ExecutionException e) {
throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
} catch (TimeoutException e) {
// Failed to leave gracefully.
LOG.warn("Failed to wait for ScaleCube cluster shutdown [reason={}]", e, e.getMessage());
}
}
if (localConnectionMgr != null) {
localConnectionMgr.stop();
}
// Messaging service checks connection manager's status before sending a message, so connection manager should be
// stopped before messaging service
messagingService.stop();
return nullCompletedFuture();
} catch (Throwable t) {
return failedFuture(t);
}
}
@Override
public void beforeNodeStop() {
this.stopAsync(new ComponentContext()).join();
}
@Override
public boolean isStopped() {
return shutdownFuture.isDone();
}
@Override
public void updateMetadata(NodeMetadata metadata) {
cluster.updateMetadata(metadata).subscribe();
topologyService.updateLocalMetadata(metadata);
}
};
}