protected Observable manageConnectionWithCompression()

in mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java [227:418]


    protected Observable<Void> manageConnectionWithCompression(final DefaultChannelWriter<R> writer, String host, int port,
                                                               String groupId, String slotId, String id, final AtomicLong lastWriteTime, final boolean applicationHeartbeats,
                                                               final Subscription heartbeatSubscription, boolean applySampling, long samplingRateMSec,
                                                               final SerializedSubject<String, String> metaMsgSubject, final Subscription metaMsgSubscription,
                                                               Func1<T, Boolean> predicate, final Action0 connectionClosedCallback,
                                                               final Counter legacyMsgProcessedCounter, final Counter legacyDroppedWrites,
                                                               final Action0 connectionSubscribeCallback, boolean compressOutput, boolean isSSE,
                                                               byte[] delimiter, String availabilityZone) {

        if (id == null || id.isEmpty()) {
            id = host + "_" + port + "_" + System.currentTimeMillis();
        }

        if (slotId == null || slotId.isEmpty()) {
            slotId = id;
        }

        if (groupId == null || groupId.isEmpty()) {
            groupId = id;
        }

        final BasicTag clientIdTag = new BasicTag(CLIENT_ID_TAG_NAME, Optional.ofNullable(groupId).orElse("none"));

        SerializedSubject<List<byte[]>, List<byte[]>> subject
            = new SerializedSubject<>(PublishSubject.<List<byte[]>>create());
        Observable<List<byte[]>> observable = subject.lift(new DropOperator<>("batch_writes", clientIdTag));

        if (applySampling) {
            observable =
                observable
                    .sample(samplingRateMSec, TimeUnit.MILLISECONDS)
                    .map((List<byte[]> list) -> {
                            // get most recent item from sample
                            List<byte[]> singleItem = new LinkedList<>();
                            if (!list.isEmpty()) {
                                singleItem.add(list.get(list.size() - 1));
                            }
                            return singleItem;
                        }
                    );
        }

        Metrics writableMetrics = new Metrics.Builder()
            .id("PushServer", clientIdTag)
            .addCounter("channelWritable")
            .addCounter("channelNotWritable")
            .addCounter("channelNotWritableTimeout")
            .build();
        metricsRegistry.registerAndGet(writableMetrics);
        Counter channelWritableCounter = writableMetrics.getCounter("channelWritable");
        Counter channelNotWritableCounter = writableMetrics.getCounter("channelNotWritable");
        Counter channelNotWritableTimeoutCounter = writableMetrics.getCounter("channelNotWritableTimeout");

        final Future<?> writableCheck;
        AtomicLong lastWritableTS = new AtomicLong(System.currentTimeMillis());
        if (maxNotWritableTimeSec > 0) {
            writableCheck = scheduledExecutorService.scheduleAtFixedRate(
                () -> {
                    long currentTime = System.currentTimeMillis();
                    if (writer.getChannel().isWritable()) {
                        channelWritableCounter.increment();
                        lastWritableTS.set(currentTime);
                    } else if (currentTime - lastWritableTS.get() > TimeUnit.SECONDS.toMillis(maxNotWritableTimeSec)) {
                        logger.warn("Closing connection due to channel not writable for more than {} secs", maxNotWritableTimeSec);
                        channelNotWritableTimeoutCounter.increment();
                        try {
                            writer.close();
                        } catch (Throwable ex) {
                            logger.error("Failed to close connection.", ex);
                        }
                    } else {
                        channelNotWritableCounter.increment();
                    }
                },
                0,
                10,
                TimeUnit.SECONDS
            );
        } else {
            writableCheck = null;
        }

        final AsyncConnection<T> connection = new AsyncConnection<T>(host,
            port, id, slotId, groupId, subject, predicate, availabilityZone);

        final Channel channel = writer.getChannel();
        channel.closeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
            @Override
            public void operationComplete(io.netty.util.concurrent.Future<Void> future) throws Exception {
                connectionManager.remove(connection);
                connectionCleanup(heartbeatSubscription, connectionClosedCallback, metaMsgSubscription);
                // Callback from the channel is closed, we don't need to check channel status anymore.
                if (writableCheck != null) {
                    writableCheck.cancel(false);
                }
            }
        });

        return
            observable
                .doOnSubscribe(() -> {
                        connectionManager.add(connection);
                        if (connectionSubscribeCallback != null) {
                            connectionSubscribeCallback.call();
                        }
                    }
                ) // per connection buffer
                .lift(new DisableBackPressureOperator<List<byte[]>>())
                .buffer(200, TimeUnit.MILLISECONDS)
                .flatMap((List<List<byte[]>> bufferOfBuffers) -> {
                        if (bufferOfBuffers != null && !bufferOfBuffers.isEmpty()) {
                            ByteBuffer blockBuffer = null;

                            int size = 0;
                            for (List<byte[]> buffer : bufferOfBuffers) {
                                size += buffer.size();
                            }
                            final int batchSize = size;
                            processedWrites.increment(batchSize);
                            if (channel.isActive() && channel.isWritable()) {
                                lastWritableTS.set(System.currentTimeMillis());
                                if (isSSE) {
                                    if (compressOutput) {
                                        boolean useSnappy = true;
                                        byte[] compressedData =  delimiter == null
                                            ? CompressionUtils.compressAndBase64EncodeBytes(bufferOfBuffers, useSnappy)
                                            : CompressionUtils.compressAndBase64EncodeBytes(bufferOfBuffers, useSnappy, delimiter);

                                        blockBuffer = ByteBuffer.allocate(prefix.length + compressedData.length + nwnw.length);
                                        blockBuffer.put(prefix);
                                        blockBuffer.put(compressedData);
                                        blockBuffer.put(nwnw);
                                    } else {
                                        int totalBytes = 0;
                                        for (List<byte[]> buffer : bufferOfBuffers) {

                                            for (byte[] data : buffer) {
                                                totalBytes += (data.length + prefix.length + nwnw.length);
                                            }
                                        }
                                        byte[] block = new byte[totalBytes];
                                        blockBuffer = ByteBuffer.wrap(block);
                                        for (List<byte[]> buffer : bufferOfBuffers) {
                                            for (byte[] data : buffer) {
                                                blockBuffer.put(prefix);
                                                blockBuffer.put(data);
                                                blockBuffer.put(nwnw);
                                            }
                                        }
                                    }
                                } else {
                                    int totalBytes = 0;
                                    for (List<byte[]> buffer : bufferOfBuffers) {

                                        for (byte[] data : buffer) {
                                            totalBytes += (data.length);
                                        }
                                    }
                                    byte[] block = new byte[totalBytes];
                                    blockBuffer = ByteBuffer.wrap(block);
                                    for (List<byte[]> buffer : bufferOfBuffers) {
                                        for (byte[] data : buffer) {
                                            blockBuffer.put(data);
                                        }
                                    }
                                }
                                return
                                    writer
                                        .writeBytesAndFlush(blockBuffer.array())
                                        .retry(writeRetryCount)
                                        .doOnError((Throwable t1) -> failedToWriteBatch(connection, batchSize, legacyDroppedWrites, metaMsgSubject))
                                        .doOnCompleted(() -> {
                                                if (applicationHeartbeats && lastWriteTime != null) {
                                                    lastWriteTime.set(System.currentTimeMillis());
                                                }
                                                if (legacyMsgProcessedCounter != null) {
                                                    legacyMsgProcessedCounter.increment(batchSize);
                                                }
                                                successfulWrites.increment(batchSize);
                                                connectionManager.successfulWrites(connection, batchSize);
                                            }
                                        )
                                        .doOnTerminate(() -> batchWriteSize.set(batchSize));
                            } else {
                                // connection is not active or writable
                                failedToWriteBatch(connection, batchSize, legacyDroppedWrites, metaMsgSubject);
                            }
                        }
                        return Observable.empty();
                    }
                );
    }