in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java [440:503]
public void run() {
while(true) {
try {
Thread.sleep(1000);
long now = System.currentTimeMillis();
heartbeatInfo.keySet().stream().forEach(raftPeer -> {
Long heartbeatTimestamp = heartbeatInfo.get(raftPeer);
// Introduce configuration for period to detect the failure.
if((now - heartbeatTimestamp) > failureDetectionPeriod) {
// Close the logs serve by peer if any.
if (peerLogs.containsKey(raftPeer)) {
LOG.warn("Closing all logs hosted by peer {} because last heartbeat ({}ms) exceeds " +
"the threshold ({}ms)", raftPeer, now - heartbeatTimestamp, failureDetectionPeriod);
peers.remove(raftPeer);
Set<LogName> logNames = peerLogs.get(raftPeer);
Iterator<LogName> itr = logNames.iterator();
while(itr.hasNext()) {
LogName logName = itr.next();
RaftGroup group = map.get(logName);
try (RaftClient client = RaftClient.newBuilder()
.setRaftGroup(group).setProperties(properties).build()) {
LOG.warn(String.format("Peer %s in the group %s went down." +
" Hence closing the log %s serve by the group.",
raftPeer.toString(), group.toString(), logName.toString()));
RaftClientReply reply = client.io().send(
() -> LogServiceProtoUtil.
toChangeStateRequestProto(logName, LogStream.State.CLOSED, true)
.toByteString());
LogServiceProtos.ChangeStateReplyProto message =
LogServiceProtos.ChangeStateReplyProto.parseFrom(
reply.getMessage().getContent());
if(message.hasException()) {
throw new IOException(message.getException().getErrorMsg());
}
itr.remove();
} catch (IOException e) {
LOG.warn(String.format("Failed to close log %s on peer %s failure.",
logName, raftPeer.toString()), e);
}
}
if(logNames.isEmpty()) {
peerLogs.remove(raftPeer);
heartbeatInfo.remove(raftPeer);
} // else retry closing failed logs on next period.
}
final List<PeerGroups> peerGroupsToRemove = new ArrayList<>();
// remove peer groups from avail.
avail.stream().forEach(peerGroup -> {
if(peerGroup.getPeer().equals(raftPeer)) {
peerGroupsToRemove.add(peerGroup);
}
});
for(PeerGroups peerGroups: peerGroupsToRemove) {
avail.remove(peerGroups);
}
}
});
} catch (Exception e) {
LOG.error(
"Exception while closing logs and removing peer" +
" from raft groups with Metadata Service on node failure", e);
}
}
}