in safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java [176:238]
private Model(final boolean disabled, final InvocationContext context,
final Bulkhead bulkhead, final boolean useThreads,
final FaultToleranceMetrics.Counter callsAccepted,
final FaultToleranceMetrics.Counter callsRejected,
final FaultToleranceMetrics.Histogram executionDuration,
final FaultToleranceMetrics.Histogram waitingDuration) {
this.disabled = disabled;
this.value = bulkhead.value();
if (this.value <= 0) {
throw new FaultToleranceDefinitionException("Invalid value in @Bulkhead: " + value);
}
this.waitingQueue = bulkhead.waitingTaskQueue();
if (this.waitingQueue <= 0) {
throw new FaultToleranceDefinitionException("Invalid value in @Bulkhead: " + value);
}
this.callsAccepted = callsAccepted;
this.callsRejected = callsRejected;
this.executionDuration = executionDuration;
this.waitingDuration = waitingDuration;
this.useThreads = useThreads;
if (this.useThreads) { // important: use a pool dedicated for that concern and not a reusable one
this.workQueue = new ArrayBlockingQueue<>(waitingQueue);
this.pool = new ThreadPoolExecutor(value, value, 0L, MILLISECONDS, workQueue, new ThreadFactory() {
private final ThreadGroup group = ofNullable(System.getSecurityManager())
.map(SecurityManager::getThreadGroup)
.orElseGet(() -> Thread.currentThread().getThreadGroup());
private final String prefix = "org.apache.geronimo.safeguard.bulkhead@" +
System.identityHashCode(this) + "[" + context.getMethod() + "]-";
private final AtomicLong counter = new AtomicLong();
@Override
public Thread newThread(final Runnable r) {
return new Thread(group, r, prefix + counter.incrementAndGet());
}
}, (r, executor) -> {
callsRejected.inc();
throw new BulkheadException("Can't accept task " + r);
}) {
@Override
public void execute(final Runnable command) {
final long submitted = System.nanoTime();
super.execute(() -> {
final long start = System.nanoTime();
waitingDuration.update(start - submitted);
try {
command.run();
} finally {
executionDuration.update(System.nanoTime() - start);
}
});
callsAccepted.inc();
}
};
this.semaphore = null;
} else {
this.workQueue = null;
this.pool = null;
this.semaphore = new Semaphore(value);
}
}