public Observable handle()

in mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java [110:365]


    public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                   final HttpServerResponse<ServerSentEvent> response) {
        InetSocketAddress socketAddress = (InetSocketAddress) response.getChannel().remoteAddress();
        LOG.info("HTTP SSE connection received from " + socketAddress.getAddress() + ":" + socketAddress.getPort() + "  queryParams: " + request.getQueryParameters());
        final String socketAddrStr = socketAddress.getAddress().toString();

        final WritableEndpoint<String> sn = new WritableEndpoint<>(socketAddress.getHostString(),
                socketAddress.getPort(), Endpoint.uniqueHost(socketAddress.getHostString(), socketAddress.getPort(), null));

        final Map<String, List<String>> queryParameters = request.getQueryParameters();

        final SlotAssignmentManager<String> slotMgr = ssm.registerServer(sn, queryParameters);

        final AtomicLong lastResponseFlush = new AtomicLong();
        lastResponseFlush.set(-1);

        final AtomicLong lastResponseSent = new AtomicLong(-1);
        // copy reference, then apply request specific filters, sampling
        Observable<T> requestObservable = observableToServe;

        // decouple the observable on a separate thread and add backpressure handling
        String decoupleSSE = "false";//ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("sse.decouple", "false");
        //TODO Below condition would be always false during if condition.
        // Since decoupleSSE would be false and matching with true as string
        // would always ignore code inside if block
        if ("true".equals(decoupleSSE)) {
            final BasicTag sockAddrTag = new BasicTag("sockAddr", Optional.ofNullable(socketAddrStr).orElse("none"));
            requestObservable = requestObservable
                    .lift(new DropOperator<>("outgoing_ServerSentEventRequestHandler", sockAddrTag))
                    .observeOn(Schedulers.io());
        }
        response.getHeaders().set("Access-Control-Allow-Origin", "*");
        response.getHeaders().set("content-type", "text/event-stream");
        response.getHeaders().set("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
        response.getHeaders().set("Pragma", "no-cache");
        response.flush();

        String uniqueClientId = socketAddrStr;

        Tag[] tags = new Tag[2];
        final String clientId = Optional.ofNullable(uniqueClientId).orElse("none");
        final String sockAddr = Optional.ofNullable(socketAddrStr).orElse("none");
        tags[0] = new BasicTag("clientId", clientId);
        tags[1] = new BasicTag("sockAddr", sockAddr);

        Metrics sseSinkMetrics = new Metrics.Builder()
                .id("ServerSentEventRequestHandler", tags)
                .addCounter("processedCounter")
                .addCounter("pingCounter")
                .addCounter("errorCounter")
                .addCounter("droppedCounter")
                .addCounter("flushCounter")
                .addCounter("sourceJobNameMismatchRejection")
                .build();


        final Counter msgProcessedCounter = sseSinkMetrics.getCounter("processedCounter");
        final Counter pingCounter = sseSinkMetrics.getCounter("pingCounter");
        final Counter errorCounter = sseSinkMetrics.getCounter("errorCounter");
        final Counter droppedWrites = sseSinkMetrics.getCounter("droppedCounter");
        final Counter flushCounter = sseSinkMetrics.getCounter("flushCounter");
        final Counter sourceJobNameMismatchRejectionCounter = sseSinkMetrics.getCounter("sourceJobNameMismatchRejection");


        if (queryParameters != null && queryParameters.containsKey(MantisSSEConstants.TARGET_JOB)) {
            String targetJob = queryParameters.get(MantisSSEConstants.TARGET_JOB).get(0);
            String currentJob = this.context.getWorkerInfo().getJobClusterName();
            if (!currentJob.equalsIgnoreCase(targetJob)) {
                LOG.info("Rejecting connection from {}. Client is targeting job {} but this is job {}.", uniqueClientId, targetJob, currentJob);
                sourceJobNameMismatchRejectionCounter.increment();
                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                response.writeStringAndFlush("data: " + MantisSSEConstants.TARGET_JOB + " is " + targetJob + " but this is " + currentJob + "." + TWO_NEWLINES);
                return response.close();
            }
        }

        if (queryParameters != null && queryParameters.containsKey(CLIENT_ID_PARAM)) {
            // enablePings
            uniqueClientId = queryParameters.get(CLIENT_ID_PARAM).get(0);

        }

        if (queryParameters != null && queryParameters.containsKey(FORMAT_PARAM)) {

            format = queryParameters.get(FORMAT_PARAM).get(0);

        }

        if (queryParameters != null && requestPreprocessor != null) {

            requestPreprocessor.call(queryParameters, context);
        }

        // apply sampling, milli, then seconds
        if (queryParameters != null && queryParameters.containsKey(SAMPLE_PARAM_MSEC)) {
            // apply sampling rate
            int samplingRate = Integer.parseInt(queryParameters.get(SAMPLE_PARAM_MSEC).get(0));
            requestObservable = requestObservable.sample(samplingRate, TimeUnit.MILLISECONDS);
        }

        if (queryParameters != null && queryParameters.containsKey(SAMPLE_PARAM)) {
            // apply sampling rate
            int samplingRate = Integer.parseInt(queryParameters.get(SAMPLE_PARAM).get(0));
            requestObservable = requestObservable.sample(samplingRate, TimeUnit.SECONDS);
        }

        if (queryParameters != null && queryParameters.containsKey(ENABLE_PINGS_PARAM)) {
            // enablePings
            String enablePings = queryParameters.get(ENABLE_PINGS_PARAM).get(0);
            //TODO Note: Code logic can be improved here.
            // since if condition check returns same true or false which can equated to pingsEnabled value.
            if ("true".equalsIgnoreCase(enablePings)) {
                pingsEnabled = true;
            } else {
                pingsEnabled = false;
            }
        }

        if (queryParameters != null && queryParameters.containsKey("delay")) {
            // apply flush
            try {
                int flushInterval = Integer.parseInt(queryParameters.get("delay").get(0));
                if (flushInterval >= 50) {
                    flushIntervalMillis = flushInterval;
                } else {
                    LOG.warn("delay parameter too small " + flushInterval + " min. is 100");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        final byte[] delimiter = queryParameters != null
                && queryParameters.containsKey(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER)
                && queryParameters.get(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER).get(0) != null
                ? queryParameters.get(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER).get(0).getBytes()
                : null;

        // get predicate, defaults to return true for all T
        Func1<T, Boolean> filterFunction = new Func1<T, Boolean>() {
            @Override
            public Boolean call(T t1) {
                return true;
            }
        };
        if (queryParameters != null && predicate != null) {
            filterFunction = predicate.getPredicate().call(queryParameters);
        }

        final Subscription timerSubscription = Observable.interval(1, TimeUnit.SECONDS).doOnNext(new Action1<Long>() {
            @Override
            public void call(Long t1) {
                long currentTime = System.currentTimeMillis();
                if (pingsEnabled && (lastResponseSent.get() == -1 || currentTime > lastResponseSent.get() + PING_INTERVAL)) {
                    pingCounter.increment();

                    response.writeStringAndFlush(PING);

                    lastResponseSent.set(currentTime);
                }
            }
        }).subscribe();

        return requestObservable

                .filter(filterFunction)
                .map(encoder)
                .lift(new DisableBackPressureOperator<>())
                .buffer(flushIntervalMillis, TimeUnit.MILLISECONDS)
                .flatMap(new Func1<List<String>, Observable<Void>>() {
                    @Override
                    public Observable<Void> call(List<String> valueList) {
                        if (response.isCloseIssued() || !response.getChannel().isActive()) {
                            LOG.info("Client closed detected, throwing closed channel exception");
                            return Observable.error(new ClosedChannelException());
                        }

                        List<String> filteredList = valueList.stream().filter(e -> {
                            return slotMgr.filter(sn, e.getBytes());
                        }).collect(Collectors.toList());
                        if (response.getChannel().isWritable()) {


                            flushCounter.increment();

                            if (format.equals(BINARY_FORMAT)) {
                                boolean useSnappy = true;
                                try {
                                    String compressedList = delimiter == null
                                            ? CompressionUtils.compressAndBase64Encode(filteredList, useSnappy)
                                            : CompressionUtils.compressAndBase64Encode(filteredList, useSnappy, delimiter);
                                    StringBuilder sb = new StringBuilder(3);
                                    sb.append(SSE_DATA_PREFIX);
                                    sb.append(compressedList);
                                    sb.append(TWO_NEWLINES);

                                    msgProcessedCounter.increment(valueList.size());
                                    lastResponseSent.set(System.currentTimeMillis());
                                    return response.writeStringAndFlush(sb.toString());
                                } catch (Exception e) {
                                    LOG.warn("Could not compress data" + e.getMessage());
                                    droppedWrites.increment(valueList.size());
                                    return Observable.empty();
                                }
                            } else {
                                int noOfMsgs = 0;
                                StringBuilder sb = new StringBuilder(valueList.size() * 3);
                                for (String s : filteredList) {

                                    sb.append(SSE_DATA_PREFIX);
                                    sb.append(s);
                                    sb.append(TWO_NEWLINES);
                                    noOfMsgs++;


                                }
                                msgProcessedCounter.increment(noOfMsgs);
                                lastResponseSent.set(System.currentTimeMillis());
                                return response.writeStringAndFlush(sb.toString());
                            }

                        } else {
                            //
                            droppedWrites.increment(filteredList.size());
                        }
                        return Observable.empty();
                    }
                })
                .onErrorResumeNext(new Func1<Throwable, Observable<? extends Void>>() {
                    @Override
                    public Observable<? extends Void> call(Throwable throwable) {
                        Throwable cause = throwable.getCause();
                        // ignore closed channel exceptions, this is
                        // when the connection was closed on the client
                        // side without informing the server
                        errorCounter.increment();
                        if (cause != null && !(cause instanceof ClosedChannelException)) {
                            LOG.warn("Error detected in SSE sink", cause);
                            if (errorEncoder != null) {
                                // write error out on connection
                                //response.writeAndFlush(errorEncoder.call(throwable));
                                ByteBuf errType = response.getAllocator().buffer().writeBytes("error: ".getBytes());
                                ByteBuf errRes = response.getAllocator().buffer().writeBytes((errorEncoder.call(throwable)).getBytes());
                                response.writeAndFlush(ServerSentEvent.withEventType(errType, errRes));
                            }
                            throwable.printStackTrace();
                        }
                        if (requestPostprocessor != null && queryParameters != null) {
                            requestPostprocessor.call(queryParameters, context);
                        }
                        ssm.deregisterServer(sn, queryParameters);
                        timerSubscription.unsubscribe();
                        return Observable.error(throwable);
                    }
                });
    }