in ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java [305:362]
private void run() {
// if and when a log task encounters an exception
RaftLogIOException logIOException = null;
CodeInjectionForTesting.execute(RUN_WORKER, server == null ? null : server.getId(), null, queue);
while (running) {
try {
Task task = queue.poll(ONE_SECOND);
if (task != null) {
task.stopTimerOnDequeue();
try {
if (logIOException != null) {
throw logIOException;
} else {
try (UncheckedAutoCloseable ignored = raftLogMetrics.startTaskExecutionTimer(task.getClass())) {
task.execute();
}
}
} catch (IOException e) {
if (task.getEndIndex() < lastWrittenIndex) {
LOG.info("Ignore IOException when handling task " + task
+ " which is smaller than the lastWrittenIndex."
+ " There should be a snapshot installed.", e);
} else {
task.failed(e);
if (logIOException == null) {
logIOException = new RaftLogIOException("Log already failed"
+ " at index " + task.getEndIndex()
+ " for task " + task, e);
}
continue;
}
}
task.done();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
LOG.warn("{} got interrupted while still running",
Thread.currentThread().getName());
}
LOG.info(Thread.currentThread().getName()
+ " was interrupted, exiting. There are " + queue.getNumElements()
+ " tasks remaining in the queue.");
break;
} catch (Exception e) {
if (!running) {
LOG.info("{} got closed and hit exception",
Thread.currentThread().getName(), e);
} else {
LOG.error("{} hit exception", Thread.currentThread().getName(), e);
Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
}
}
}
queue.clear(Task::discard);
}