public ClusterService createClusterService()

in modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java [79:223]


    public ClusterService createClusterService(
            String consistentId,
            NetworkConfiguration networkConfiguration,
            NettyBootstrapFactory nettyBootstrapFactory,
            MessageSerializationRegistry serializationRegistry,
            StaleIds staleIds
    ) {
        var messageFactory = new NetworkMessagesFactory();

        var topologyService = new ScaleCubeTopologyService();

        UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();

        var messagingService = new DefaultMessagingService(
                consistentId,
                messageFactory,
                topologyService,
                userObjectSerialization.descriptorRegistry(),
                userObjectSerialization.marshaller()
        );

        return new AbstractClusterService(consistentId, topologyService, messagingService, serializationRegistry) {

            private volatile ClusterImpl cluster;

            private volatile ConnectionManager connectionMgr;

            private volatile CompletableFuture<Void> shutdownFuture;

            /** {@inheritDoc} */
            @Override
            public void start() {
                var serializationService = new SerializationService(serializationRegistry, userObjectSerialization);

                UUID launchId = UUID.randomUUID();

                NetworkView configView = networkConfiguration.value();

                connectionMgr = new ConnectionManager(
                        configView,
                        serializationService,
                        launchId,
                        consistentId,
                        nettyBootstrapFactory,
                        staleIds
                );

                connectionMgr.start();

                var transport = new ScaleCubeDirectMarshallerTransport(
                        connectionMgr.localAddress(),
                        messagingService,
                        topologyService,
                        messageFactory
                );

                NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
                cluster = new ClusterImpl(clusterConfig(configView.membership()))
                        .handler(cl -> new ClusterMessageHandler() {
                            /** {@inheritDoc} */
                            @Override
                            public void onMembershipEvent(MembershipEvent event) {
                                topologyService.onMembershipEvent(event);
                            }
                        })
                        .config(opts -> opts
                                .memberId(launchId.toString())
                                .memberAlias(consistentId)
                                .metadataCodec(METADATA_CODEC)
                        )
                        .transport(opts -> opts.transportFactory(transportConfig -> transport))
                        .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));

                shutdownFuture = cluster.onShutdown().toFuture();

                // resolve cyclic dependencies
                topologyService.setCluster(cluster);
                messagingService.setConnectionManager(connectionMgr);

                topologyService.addEventHandler(new TopologyEventHandler() {
                    @Override
                    public void onDisappeared(ClusterNode member) {
                        staleIds.markAsStale(member.id());
                    }
                });

                cluster.startAwait();

                // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
                var localMembershipEvent = createAdded(cluster.member(), null, System.currentTimeMillis());
                topologyService.onMembershipEvent(localMembershipEvent);
            }

            /** {@inheritDoc} */
            @Override
            public void stop() {
                // local member will be null, if cluster has not been started
                if (cluster == null || cluster.member() == null) {
                    return;
                }

                connectionMgr.initiateStopping();

                cluster.shutdown();

                try {
                    shutdownFuture.get(10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();

                    throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
                } catch (ExecutionException e) {
                    throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
                } catch (TimeoutException e) {
                    // Failed to leave gracefully
                    LOG.warn("Failed to wait for ScaleCube cluster shutdown [reason={}]", e, e.getMessage());
                }

                connectionMgr.stop();

                // Messaging service checks connection manager's status before sending a message, so connection manager should be
                // stopped before messaging service
                messagingService.stop();
            }

            /** {@inheritDoc} */
            @Override
            public void beforeNodeStop() {
                stop();
            }

            /** {@inheritDoc} */
            @Override
            public boolean isStopped() {
                return shutdownFuture.isDone();
            }

            @Override
            public void updateMetadata(NodeMetadata metadata) {
                cluster.updateMetadata(metadata).subscribe();
                topologyService.updateLocalMetadata(metadata);
            }

        };
    }