public ClusterService createClusterService()

in modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java [101:295]


    public ClusterService createClusterService(
            String consistentId,
            NetworkConfiguration networkConfiguration,
            NettyBootstrapFactory nettyBootstrapFactory,
            MessageSerializationRegistry serializationRegistry,
            StaleIds staleIds,
            ClusterIdSupplier clusterIdSupplier,
            CriticalWorkerRegistry criticalWorkerRegistry,
            FailureProcessor failureProcessor,
            ChannelTypeRegistry channelTypeRegistry,
            IgniteProductVersionSource productVersionSource
    ) {
        var topologyService = new ScaleCubeTopologyService();

        // Adding this handler as the first handler to make sure that StaleIds is at least up-to-date as any
        // other component that watches topology events.
        topologyService.addEventHandler(new TopologyEventHandler() {
            @Override
            public void onDisappeared(ClusterNode member) {
                staleIds.markAsStale(member.id());
            }
        });

        var messageFactory = new NetworkMessagesFactory();

        UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();

        var messagingService = new DefaultMessagingService(
                consistentId,
                messageFactory,
                topologyService,
                staleIds,
                userObjectSerialization.descriptorRegistry(),
                userObjectSerialization.marshaller(),
                criticalWorkerRegistry,
                failureProcessor,
                channelTypeRegistry
        );

        return new AbstractClusterService(consistentId, topologyService, messagingService, serializationRegistry) {

            private volatile ClusterImpl cluster;

            private volatile ConnectionManager connectionMgr;

            private volatile CompletableFuture<Void> shutdownFuture;

            @Override
            public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
                var serializationService = new SerializationService(serializationRegistry, userObjectSerialization);

                UUID launchId = UUID.randomUUID();

                NetworkView configView = networkConfiguration.value();

                ConnectionManager connectionMgr = new ConnectionManager(
                        configView,
                        serializationService,
                        consistentId,
                        launchId,
                        nettyBootstrapFactory,
                        staleIds,
                        clusterIdSupplier,
                        channelTypeRegistry,
                        productVersionSource
                );
                this.connectionMgr = connectionMgr;

                connectionMgr.start();
                messagingService.start();

                Address scalecubeLocalAddress = prepareAddress(connectionMgr.localAddress());

                topologyService.addEventHandler(new TopologyEventHandler() {
                    @Override
                    public void onDisappeared(ClusterNode member) {
                        connectionMgr.handleNodeLeft(member.id());
                    }
                });

                var transport = new ScaleCubeDirectMarshallerTransport(
                        scalecubeLocalAddress,
                        messagingService,
                        topologyService,
                        messageFactory
                );

                ClusterConfig clusterConfig = clusterConfig(configView.membership());
                NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder(), nodeName(), connectionMgr.localAddress());
                finder.start();

                ClusterImpl cluster = new ClusterImpl(clusterConfig)
                        .handler(cl -> new ClusterMessageHandler() {
                            @Override
                            public void onMembershipEvent(MembershipEvent event) {
                                topologyService.onMembershipEvent(event);
                            }
                        })
                        .config(opts -> opts
                                .memberId(launchId.toString())
                                .memberAlias(consistentId)
                                .metadataCodec(METADATA_CODEC)
                        )
                        .transport(opts -> opts.transportFactory(transportConfig -> transport))
                        .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));

                Member localMember = createLocalMember(scalecubeLocalAddress, launchId, clusterConfig);
                ClusterNode localNode = new ClusterNodeImpl(
                        UUID.fromString(localMember.id()),
                        consistentId,
                        new NetworkAddress(localMember.address().host(), localMember.address().port())
                );
                connectionMgr.setLocalNode(localNode);

                this.shutdownFuture = cluster.onShutdown().toFuture()
                        .thenAccept(v -> finder.close());

                // resolve cyclic dependencies
                topologyService.setCluster(cluster);
                messagingService.setConnectionManager(connectionMgr);

                cluster.startAwait();

                assert cluster.member().equals(localMember) : "Expected local member from cluster " + cluster.member()
                        + " to be equal to the precomputed one " + localMember;

                // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
                var localMembershipEvent = createAdded(cluster.member(), null, System.currentTimeMillis());
                topologyService.onMembershipEvent(localMembershipEvent);

                this.cluster = cluster;

                return nullCompletedFuture();
            }

            @Override
            public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
                try {
                    ConnectionManager localConnectionMgr = connectionMgr;

                    if (localConnectionMgr != null) {
                        localConnectionMgr.initiateStopping();
                    }

                    // Local member will be null, if cluster has not been started.
                    if (cluster != null && cluster.member() != null) {
                        cluster.shutdown();

                        try {
                            shutdownFuture.get(10, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();

                            throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
                        } catch (ExecutionException e) {
                            throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
                        } catch (TimeoutException e) {
                            // Failed to leave gracefully.
                            LOG.warn("Failed to wait for ScaleCube cluster shutdown [reason={}]", e, e.getMessage());
                        }

                    }

                    if (localConnectionMgr != null) {
                        localConnectionMgr.stop();
                    }

                    // Messaging service checks connection manager's status before sending a message, so connection manager should be
                    // stopped before messaging service
                    messagingService.stop();

                    return nullCompletedFuture();
                } catch (Throwable t) {
                    return failedFuture(t);
                }
            }

            @Override
            public void beforeNodeStop() {
                this.stopAsync(new ComponentContext()).join();
            }

            @Override
            public boolean isStopped() {
                return shutdownFuture.isDone();
            }

            @Override
            public void updateMetadata(NodeMetadata metadata) {
                cluster.updateMetadata(metadata).subscribe();
                topologyService.updateLocalMetadata(metadata);
            }

        };
    }