public void run()

in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java [440:503]


        public void run() {
            while(true) {
                try {
                    Thread.sleep(1000);
                    long now = System.currentTimeMillis();
                    heartbeatInfo.keySet().stream().forEach(raftPeer -> {
                        Long heartbeatTimestamp = heartbeatInfo.get(raftPeer);
                        // Introduce configuration for period to detect the failure.
                        if((now - heartbeatTimestamp) > failureDetectionPeriod) {
                            // Close the logs serve by peer if any.
                            if (peerLogs.containsKey(raftPeer)) {
                                LOG.warn("Closing all logs hosted by peer {} because last heartbeat ({}ms) exceeds " +
                                    "the threshold ({}ms)", raftPeer, now - heartbeatTimestamp, failureDetectionPeriod);
                                peers.remove(raftPeer);
                                Set<LogName> logNames = peerLogs.get(raftPeer);
                                Iterator<LogName> itr = logNames.iterator();
                                while(itr.hasNext()) {
                                    LogName logName = itr.next();
                                    RaftGroup group = map.get(logName);
                                    try (RaftClient client = RaftClient.newBuilder()
                                        .setRaftGroup(group).setProperties(properties).build()) {
                                        LOG.warn(String.format("Peer %s in the group %s went down." +
                                                        " Hence closing the log %s serve by the group.",
                                                raftPeer.toString(), group.toString(), logName.toString()));
                                        RaftClientReply reply = client.io().send(
                                                () -> LogServiceProtoUtil.
                                                        toChangeStateRequestProto(logName, LogStream.State.CLOSED, true)
                                                        .toByteString());
                                        LogServiceProtos.ChangeStateReplyProto message =
                                                LogServiceProtos.ChangeStateReplyProto.parseFrom(
                                                    reply.getMessage().getContent());
                                        if(message.hasException()) {
                                            throw new IOException(message.getException().getErrorMsg());
                                        }
                                        itr.remove();
                                    } catch (IOException e) {
                                        LOG.warn(String.format("Failed to close log %s on peer %s failure.",
                                                logName, raftPeer.toString()), e);
                                    }
                                }
                                if(logNames.isEmpty()) {
                                    peerLogs.remove(raftPeer);
                                    heartbeatInfo.remove(raftPeer);
                                } // else retry closing failed logs on next period.
                            }
                            final List<PeerGroups> peerGroupsToRemove = new ArrayList<>();
                            // remove peer groups from avail.
                            avail.stream().forEach(peerGroup -> {
                                if(peerGroup.getPeer().equals(raftPeer)) {
                                    peerGroupsToRemove.add(peerGroup);
                                }
                            });
                            for(PeerGroups peerGroups: peerGroupsToRemove) {
                                avail.remove(peerGroups);
                            }
                        }
                    });
                } catch (Exception e) {
                    LOG.error(
                            "Exception while closing logs and removing peer" +
                                    " from raft groups with Metadata Service on node failure", e);
                }
            }
        }