public void start()

in mantis-common/src/main/java/io/mantisrx/common/metrics/MetricsServer.java [93:163]


    public void start() {

        final Observable<Measurements> measurements = measurements(publishRateInSeconds);

        logger.info("Starting metrics server on port: " + port);
        server = RxNetty.createHttpServer(
                port,
                new RequestHandler<ByteBuf, ServerSentEvent>() {
                    @Override
                    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ServerSentEvent> response) {

                        final Map<String, List<String>> queryParameters = request.getQueryParameters();

                        final List<String> namesToFilter = new LinkedList<>();
                        logger.info("got query params {}", queryParameters);
                        if (queryParameters != null && queryParameters.containsKey("name")) {
                            namesToFilter.addAll(queryParameters.get("name"));
                        }

                        Observable<Measurements> filteredObservable = measurements
                                .filter(new Func1<Measurements, Boolean>() {
                                    @Override
                                    public Boolean call(Measurements measurements) {
                                        if (!namesToFilter.isEmpty()) {
                                            // check filters
                                            for (String name : namesToFilter) {
                                                if (name.indexOf('*') != -1) {
                                                    // check for ends with
                                                    if (name.indexOf('*') == 0 && measurements.getName().endsWith(name.substring(1)))
                                                        return true;
                                                    // check for starts with
                                                    if (name.indexOf('*') > 0 && measurements.getName().startsWith(name.substring(0, name.indexOf('*')))) {
                                                        return true;
                                                    }
                                                }
                                                if (measurements.getName().equals(name)) {
                                                    return true; // filter match
                                                }
                                            }
                                            return false; // not found in filters
                                        } else {
                                            return true; // no filters provided
                                        }
                                    }
                                });

                        return filteredObservable.flatMap(new Func1<Measurements, Observable<Void>>() {
                            @Override
                            public Observable<Void> call(Measurements metrics) {
                                response.getHeaders().set("Access-Control-Allow-Origin", "*");
                                response.getHeaders().set("content-type", "text/event-stream");
                                ServerSentEvent event = null;
                                try {
                                    ByteBuf data = response.getAllocator().buffer().writeBytes((mapper.writeValueAsString(metrics)).getBytes());
                                    event = new ServerSentEvent(data);
                                    //event = new ServerSentEvent(null, "data", mapper.writeValueAsString(metrics));
                                } catch (JsonProcessingException e) {
                                    logger.error("Failed to map metrics to JSON", e);
                                }
                                if (event != null) {
                                    response.write(event);
                                    return response.writeStringAndFlush("\n");
                                }
                                return null;
                            }
                        });
                    }
                },
                PipelineConfigurators.<ByteBuf>serveSseConfigurator()
        ).start();
    }