in mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HttpUtility.java [49:121]
static Observable<String> getGetResponse(String host, int port, String uri) {
return new CompositeHttpClientBuilder<ByteBuf, ByteBuf>()
.appendPipelineConfigurator(
new PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addLast("introspecting-handler", new ChannelDuplexHandler() {
private String uri = "<undefined>";
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
uri = request.getUri();
logger.info("Sending request on channel id: " + ctx.channel().toString() +
", request URI: " + uri);
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
logger.info("Received response on channel id: " + ctx.channel().toString() +
", request URI: " + uri);
}
super.channelRead(ctx, msg);
}
});
try {
int maxContentLength = 10 * 1024 * 1024; // Ten megabytes
pipeline.replace(HttpObjectAggregator.class, "http-object-aggregator",
new HttpObjectAggregator(maxContentLength));
} catch (NoSuchElementException ex) {
logger.error("HttpObjectAggregator did not exist in this pipeline. Error: {}",
ex.getMessage(), ex);
} catch (IllegalArgumentException ex) {
logger.error("ChannelHandler named http-object-aggregator already existed in this" +
" pipeline. Error: {}", ex.getMessage(), ex);
}
catch (Throwable t) {
logger.error("Unknown error adding HttpObjectAggregator to Master Client " +
"Pipeline. Error: {}", t.getMessage(), t);
}
}
})
.build()
.submit(new RxClient.ServerInfo(host, port),
HttpClientRequest.createGet(uri),
new HttpClient.HttpClientConfig.Builder().setFollowRedirect(true).followRedirect(MAX_REDIRECTS).build())
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
@Override
public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
return response.getContent();
}
})
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf o) {
return o.toString(Charset.defaultCharset());
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logger.warn("Error: " + throwable.getMessage(), throwable);
}
})
.timeout(GET_TIMEOUT_SECS, TimeUnit.SECONDS);
}