protected Observable streamContent()

in mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java [291:351]


    protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response,
                                                            final Action1<Boolean> updateDataRecvngStatus,
                                                            final long dataRecvTimeoutSecs, String delimiter) {
        long interval = Math.max(1, dataRecvTimeoutSecs / 2);
        if (updateDataRecvngStatus != null) {
            Observable.interval(interval, interval, TimeUnit.SECONDS)
                    .doOnNext((Long aLong) -> {
                        if (!isShutdown) {
                            if (hasDataDrop() || System.currentTimeMillis() > (lastDataReceived.get() + dataRecvTimeoutSecs * 1000)) {
                                if (isReceivingData.compareAndSet(true, false))
                                    synchronized (updateDataRecvngStatus) {
                                        updateDataRecvngStatus.call(false);
                                    }
                            } else {
                                if (isConnected.get() && isReceivingData.compareAndSet(false, true))
                                    synchronized (updateDataRecvngStatus) {
                                        updateDataRecvngStatus.call(true);
                                    }
                            }
                        }
                    })
                    .takeUntil(shutdownSubject)
                    .takeWhile((o) -> !isShutdown)
                    .doOnCompleted(() -> {
                        if (isReceivingData.compareAndSet(true, false))
                            synchronized (updateDataRecvngStatus) {
                                updateDataRecvngStatus.call(false);
                            }
                    })
                    .subscribe();
        }
        return response.getContent()
                .map((ServerSentEvent t1) -> {
                    lastDataReceived.set(System.currentTimeMillis());
                    if (isConnected.get() && isReceivingData.compareAndSet(false, true))
                        if (updateDataRecvngStatus != null)
                            synchronized (updateDataRecvngStatus) {
                                updateDataRecvngStatus.call(true);
                            }

                    if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) {
                        throw new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString());
                    }
                    return t1.contentAsString();
                })
                .lift(new DropOperator<String>(metricGroupId))
                .rebatchRequests(this.bufferSize <= 0 ? 1 : this.bufferSize)
                .filter(data -> {
                    if (data.startsWith("ping")) {
                        pingCounter.increment();
                        return this.disablePingFiltering;
                    }
                    return true;
                })
                .flatMapIterable((data) -> {
                    boolean useSnappy = true;
                    return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter);
                }, 1)
                .takeUntil(shutdownSubject)
                .takeWhile((event) -> !isShutdown);
    }