harry-core/src/harry/concurrent/InfiniteLoopExecutor.java (130 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package harry.concurrent; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static harry.concurrent.Clock.Global.nanoTime; import static harry.concurrent.Condition.newOneTimeCondition; import static harry.concurrent.InfiniteLoopExecutor.InternalState.SHUTTING_DOWN_NOW; import static harry.concurrent.InfiniteLoopExecutor.InternalState.TERMINATED; import static harry.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED; import static harry.concurrent.Interruptible.State.INTERRUPTED; import static harry.concurrent.Interruptible.State.NORMAL; import static harry.concurrent.Interruptible.State.SHUTTING_DOWN; /** * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. */ public class InfiniteLoopExecutor implements Interruptible { private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class); public enum InternalState { SHUTTING_DOWN_NOW, TERMINATED } public enum SimulatorSafe { SAFE, UNSAFE } public enum Daemon { DAEMON, NON_DAEMON } public enum Interrupts { SYNCHRONIZED, UNSYNCHRONIZED } private static final AtomicReferenceFieldUpdater<InfiniteLoopExecutor, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(InfiniteLoopExecutor.class, Object.class, "state"); private final Thread thread; private final Task task; private volatile Object state = NORMAL; private final Consumer<Thread> interruptHandler; private final Condition isTerminated = newOneTimeCondition(); public InfiniteLoopExecutor(String name, Task task, Daemon daemon) { this(ExecutorFactory.Global.executorFactory(), name, task, daemon, UNSYNCHRONIZED); } public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon) { this(factory, name, task, daemon, UNSYNCHRONIZED); } public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon, Interrupts interrupts) { this.task = task; this.thread = factory.startThread(name, this::loop, daemon); this.interruptHandler = interrupts == Interrupts.SYNCHRONIZED ? interruptHandler(task) : Thread::interrupt; } public InfiniteLoopExecutor(BiFunction<String, Runnable, Thread> threadStarter, String name, Task task, Interrupts interrupts) { this.task = task; this.thread = threadStarter.apply(name, this::loop); this.interruptHandler = interrupts == Interrupts.SYNCHRONIZED ? interruptHandler(task) : Thread::interrupt; } private static Consumer<Thread> interruptHandler(final Object monitor) { return thread -> { synchronized (monitor) { thread.interrupt(); } }; } private void loop() { boolean interrupted = false; try { while (true) { try { Object cur = state; if (cur == SHUTTING_DOWN_NOW) break; interrupted |= Thread.interrupted(); if (cur == NORMAL && interrupted) cur = INTERRUPTED; task.run((State) cur); interrupted = false; if (cur == SHUTTING_DOWN) break; } catch (TerminateException ignore) { break; } catch (UncheckedInterruptedException | InterruptedException ignore) { interrupted = true; } catch (Throwable t) { logger.error("Exception thrown by runnable, continuing with loop", t); } } } finally { state = TERMINATED; isTerminated.signal(); } } public void interrupt() { interruptHandler.accept(thread); } public void shutdown() { stateUpdater.updateAndGet(this, cur -> cur != TERMINATED && cur != SHUTTING_DOWN_NOW ? SHUTTING_DOWN : cur); // TODO: InfiniteLoopExecutor should let the threads quiesce themselves rather then send interrupts //interruptHandler.accept(thread); } public Object shutdownNow() { stateUpdater.updateAndGet(this, cur -> cur != TERMINATED ? SHUTTING_DOWN_NOW : TERMINATED); interruptHandler.accept(thread); return null; } @Override public boolean isTerminated() { return state == TERMINATED; } public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException { if (isTerminated()) return true; long deadlineNanos = nanoTime() + unit.toNanos(time); isTerminated.awaitUntil(deadlineNanos); return isTerminated(); } public boolean isAlive() { return this.thread.isAlive(); } }