in mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java [227:418]
protected Observable<Void> manageConnectionWithCompression(final DefaultChannelWriter<R> writer, String host, int port,
String groupId, String slotId, String id, final AtomicLong lastWriteTime, final boolean applicationHeartbeats,
final Subscription heartbeatSubscription, boolean applySampling, long samplingRateMSec,
final SerializedSubject<String, String> metaMsgSubject, final Subscription metaMsgSubscription,
Func1<T, Boolean> predicate, final Action0 connectionClosedCallback,
final Counter legacyMsgProcessedCounter, final Counter legacyDroppedWrites,
final Action0 connectionSubscribeCallback, boolean compressOutput, boolean isSSE,
byte[] delimiter, String availabilityZone) {
if (id == null || id.isEmpty()) {
id = host + "_" + port + "_" + System.currentTimeMillis();
}
if (slotId == null || slotId.isEmpty()) {
slotId = id;
}
if (groupId == null || groupId.isEmpty()) {
groupId = id;
}
final BasicTag clientIdTag = new BasicTag(CLIENT_ID_TAG_NAME, Optional.ofNullable(groupId).orElse("none"));
SerializedSubject<List<byte[]>, List<byte[]>> subject
= new SerializedSubject<>(PublishSubject.<List<byte[]>>create());
Observable<List<byte[]>> observable = subject.lift(new DropOperator<>("batch_writes", clientIdTag));
if (applySampling) {
observable =
observable
.sample(samplingRateMSec, TimeUnit.MILLISECONDS)
.map((List<byte[]> list) -> {
// get most recent item from sample
List<byte[]> singleItem = new LinkedList<>();
if (!list.isEmpty()) {
singleItem.add(list.get(list.size() - 1));
}
return singleItem;
}
);
}
Metrics writableMetrics = new Metrics.Builder()
.id("PushServer", clientIdTag)
.addCounter("channelWritable")
.addCounter("channelNotWritable")
.addCounter("channelNotWritableTimeout")
.build();
metricsRegistry.registerAndGet(writableMetrics);
Counter channelWritableCounter = writableMetrics.getCounter("channelWritable");
Counter channelNotWritableCounter = writableMetrics.getCounter("channelNotWritable");
Counter channelNotWritableTimeoutCounter = writableMetrics.getCounter("channelNotWritableTimeout");
final Future<?> writableCheck;
AtomicLong lastWritableTS = new AtomicLong(System.currentTimeMillis());
if (maxNotWritableTimeSec > 0) {
writableCheck = scheduledExecutorService.scheduleAtFixedRate(
() -> {
long currentTime = System.currentTimeMillis();
if (writer.getChannel().isWritable()) {
channelWritableCounter.increment();
lastWritableTS.set(currentTime);
} else if (currentTime - lastWritableTS.get() > TimeUnit.SECONDS.toMillis(maxNotWritableTimeSec)) {
logger.warn("Closing connection due to channel not writable for more than {} secs", maxNotWritableTimeSec);
channelNotWritableTimeoutCounter.increment();
try {
writer.close();
} catch (Throwable ex) {
logger.error("Failed to close connection.", ex);
}
} else {
channelNotWritableCounter.increment();
}
},
0,
10,
TimeUnit.SECONDS
);
} else {
writableCheck = null;
}
final AsyncConnection<T> connection = new AsyncConnection<T>(host,
port, id, slotId, groupId, subject, predicate, availabilityZone);
final Channel channel = writer.getChannel();
channel.closeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Void> future) throws Exception {
connectionManager.remove(connection);
connectionCleanup(heartbeatSubscription, connectionClosedCallback, metaMsgSubscription);
// Callback from the channel is closed, we don't need to check channel status anymore.
if (writableCheck != null) {
writableCheck.cancel(false);
}
}
});
return
observable
.doOnSubscribe(() -> {
connectionManager.add(connection);
if (connectionSubscribeCallback != null) {
connectionSubscribeCallback.call();
}
}
) // per connection buffer
.lift(new DisableBackPressureOperator<List<byte[]>>())
.buffer(200, TimeUnit.MILLISECONDS)
.flatMap((List<List<byte[]>> bufferOfBuffers) -> {
if (bufferOfBuffers != null && !bufferOfBuffers.isEmpty()) {
ByteBuffer blockBuffer = null;
int size = 0;
for (List<byte[]> buffer : bufferOfBuffers) {
size += buffer.size();
}
final int batchSize = size;
processedWrites.increment(batchSize);
if (channel.isActive() && channel.isWritable()) {
lastWritableTS.set(System.currentTimeMillis());
if (isSSE) {
if (compressOutput) {
boolean useSnappy = true;
byte[] compressedData = delimiter == null
? CompressionUtils.compressAndBase64EncodeBytes(bufferOfBuffers, useSnappy)
: CompressionUtils.compressAndBase64EncodeBytes(bufferOfBuffers, useSnappy, delimiter);
blockBuffer = ByteBuffer.allocate(prefix.length + compressedData.length + nwnw.length);
blockBuffer.put(prefix);
blockBuffer.put(compressedData);
blockBuffer.put(nwnw);
} else {
int totalBytes = 0;
for (List<byte[]> buffer : bufferOfBuffers) {
for (byte[] data : buffer) {
totalBytes += (data.length + prefix.length + nwnw.length);
}
}
byte[] block = new byte[totalBytes];
blockBuffer = ByteBuffer.wrap(block);
for (List<byte[]> buffer : bufferOfBuffers) {
for (byte[] data : buffer) {
blockBuffer.put(prefix);
blockBuffer.put(data);
blockBuffer.put(nwnw);
}
}
}
} else {
int totalBytes = 0;
for (List<byte[]> buffer : bufferOfBuffers) {
for (byte[] data : buffer) {
totalBytes += (data.length);
}
}
byte[] block = new byte[totalBytes];
blockBuffer = ByteBuffer.wrap(block);
for (List<byte[]> buffer : bufferOfBuffers) {
for (byte[] data : buffer) {
blockBuffer.put(data);
}
}
}
return
writer
.writeBytesAndFlush(blockBuffer.array())
.retry(writeRetryCount)
.doOnError((Throwable t1) -> failedToWriteBatch(connection, batchSize, legacyDroppedWrites, metaMsgSubject))
.doOnCompleted(() -> {
if (applicationHeartbeats && lastWriteTime != null) {
lastWriteTime.set(System.currentTimeMillis());
}
if (legacyMsgProcessedCounter != null) {
legacyMsgProcessedCounter.increment(batchSize);
}
successfulWrites.increment(batchSize);
connectionManager.successfulWrites(connection, batchSize);
}
)
.doOnTerminate(() -> batchWriteSize.set(batchSize));
} else {
// connection is not active or writable
failedToWriteBatch(connection, batchSize, legacyDroppedWrites, metaMsgSubject);
}
}
return Observable.empty();
}
);
}