in servicetalk-benchmarks/src/jmh/java/io/servicetalk/http/netty/NettyPipelinedConnectionBenchmark.java [150:267]
private NettyConnection<Object, Object> newNettyConnection() {
return new NettyConnection<Object, Object>() {
private final AtomicInteger offloadCount = new AtomicInteger();
@Override
public Publisher<Object> read() {
return Publisher.empty();
}
@Override
public Completable write(final Publisher<Object> write) {
return new Completable() {
@Override
protected void handleSubscribe(final CompletableSource.Subscriber subscriber) {
// Avoid using operators to keep benchmark as focused as possible on the unit under test.
toSource(write).subscribe(new PublisherSource.Subscriber<Object>() {
@Override
public void onSubscribe(final PublisherSource.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(@Nullable final Object o) {
}
@Override
public void onError(final Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
// Avoid stack-overflow by periodically offloading the completion notification.
if ((offloadCount.incrementAndGet() & EXECUTOR_STACK_PROTECT_MASK) == 0) {
executorService.execute(subscriber::onComplete);
} else {
subscriber.onComplete();
}
}
});
}
};
}
@Override
public Completable write(final Publisher<Object> write,
final Supplier<FlushStrategy> flushStrategySupplier,
final Supplier<WriteDemandEstimator> demandEstimatorSupplier) {
return write(write);
}
@Override
public Cancellable updateFlushStrategy(final FlushStrategyProvider strategyProvider) {
return IGNORE_CANCEL;
}
@Override
public FlushStrategy defaultFlushStrategy() {
return FlushStrategies.defaultFlushStrategy();
}
@Override
public Single<Throwable> transportError() {
return Single.never();
}
@Override
public Completable onClosing() {
return Completable.never();
}
@Override
public Channel nettyChannel() {
throw new UnsupportedOperationException();
}
@Override
public SocketAddress localAddress() {
return new InetSocketAddress(0);
}
@Override
public SocketAddress remoteAddress() {
return new InetSocketAddress(0);
}
@Nullable
@Override
public SSLSession sslSession() {
return null;
}
@Override
public ExecutionContext<?> executionContext() {
return GlobalExecutionContext.globalExecutionContext();
}
@Nullable
@Override
public <T> T socketOption(final SocketOption<T> option) {
return null;
}
@Override
public Protocol protocol() {
return HttpProtocolVersion.HTTP_1_1;
}
@Override
public Completable onClose() {
return Completable.never();
}
@Override
public Completable closeAsync() {
return Completable.never();
}
};
}