static Observable getGetResponse()

in mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HttpUtility.java [49:121]


    static Observable<String> getGetResponse(String host, int port, String uri) {
        return new CompositeHttpClientBuilder<ByteBuf, ByteBuf>()
                .appendPipelineConfigurator(
                        new PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>() {
                            @Override
                            public void configureNewPipeline(ChannelPipeline pipeline) {
                                pipeline.addLast("introspecting-handler", new ChannelDuplexHandler() {
                                    private String uri = "<undefined>";

                                    @Override
                                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                            throws Exception {
                                        if (msg instanceof HttpRequest) {
                                            HttpRequest request = (HttpRequest) msg;
                                            uri = request.getUri();
                                            logger.info("Sending request on channel id: " + ctx.channel().toString() +
                                                    ", request URI: " + uri);
                                        }
                                        super.write(ctx, msg, promise);
                                    }

                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        if (msg instanceof HttpResponse) {
                                            logger.info("Received response on channel id: " + ctx.channel().toString() +
                                                    ", request URI: " + uri);
                                        }
                                        super.channelRead(ctx, msg);
                                    }
                                });

                                try {
                                    int maxContentLength = 10 * 1024 * 1024; // Ten megabytes
                                    pipeline.replace(HttpObjectAggregator.class, "http-object-aggregator",
                                            new HttpObjectAggregator(maxContentLength));
                                } catch (NoSuchElementException ex) {
                                    logger.error("HttpObjectAggregator did not exist in this pipeline. Error: {}",
                                            ex.getMessage(), ex);
                                } catch (IllegalArgumentException ex) {
                                    logger.error("ChannelHandler named http-object-aggregator already existed in this" +
                                            " pipeline. Error: {}", ex.getMessage(), ex);
                                }
                                catch (Throwable t) {
                                    logger.error("Unknown error adding HttpObjectAggregator to Master Client " +
                                            "Pipeline. Error: {}", t.getMessage(), t);
                                }
                            }

                        })
                .build()
                .submit(new RxClient.ServerInfo(host, port),
                        HttpClientRequest.createGet(uri),
                        new HttpClient.HttpClientConfig.Builder().setFollowRedirect(true).followRedirect(MAX_REDIRECTS).build())
                .flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
                    @Override
                    public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
                        return response.getContent();
                    }
                })
                .map(new Func1<ByteBuf, String>() {
                    @Override
                    public String call(ByteBuf o) {
                        return o.toString(Charset.defaultCharset());
                    }
                })
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        logger.warn("Error: " + throwable.getMessage(), throwable);
                    }
                })
                .timeout(GET_TIMEOUT_SECS, TimeUnit.SECONDS);
    }