in mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java [291:351]
protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response,
final Action1<Boolean> updateDataRecvngStatus,
final long dataRecvTimeoutSecs, String delimiter) {
long interval = Math.max(1, dataRecvTimeoutSecs / 2);
if (updateDataRecvngStatus != null) {
Observable.interval(interval, interval, TimeUnit.SECONDS)
.doOnNext((Long aLong) -> {
if (!isShutdown) {
if (hasDataDrop() || System.currentTimeMillis() > (lastDataReceived.get() + dataRecvTimeoutSecs * 1000)) {
if (isReceivingData.compareAndSet(true, false))
synchronized (updateDataRecvngStatus) {
updateDataRecvngStatus.call(false);
}
} else {
if (isConnected.get() && isReceivingData.compareAndSet(false, true))
synchronized (updateDataRecvngStatus) {
updateDataRecvngStatus.call(true);
}
}
}
})
.takeUntil(shutdownSubject)
.takeWhile((o) -> !isShutdown)
.doOnCompleted(() -> {
if (isReceivingData.compareAndSet(true, false))
synchronized (updateDataRecvngStatus) {
updateDataRecvngStatus.call(false);
}
})
.subscribe();
}
return response.getContent()
.map((ServerSentEvent t1) -> {
lastDataReceived.set(System.currentTimeMillis());
if (isConnected.get() && isReceivingData.compareAndSet(false, true))
if (updateDataRecvngStatus != null)
synchronized (updateDataRecvngStatus) {
updateDataRecvngStatus.call(true);
}
if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) {
throw new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString());
}
return t1.contentAsString();
})
.lift(new DropOperator<String>(metricGroupId))
.rebatchRequests(this.bufferSize <= 0 ? 1 : this.bufferSize)
.filter(data -> {
if (data.startsWith("ping")) {
pingCounter.increment();
return this.disablePingFiltering;
}
return true;
})
.flatMapIterable((data) -> {
boolean useSnappy = true;
return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter);
}, 1)
.takeUntil(shutdownSubject)
.takeWhile((event) -> !isShutdown);
}