in mantis-network/src/main/java/io/reactivex/mantis/network/push/LegacyTcpPushServer.java [62:177]
public RxServer<?, ?> createServer() {
RxServer<RemoteRxEvent, RemoteRxEvent> server
= RxNetty.newTcpServerBuilder(port, new ConnectionHandler<RemoteRxEvent, RemoteRxEvent>() {
@Override
public Observable<Void> handle(
final ObservableConnection<RemoteRxEvent, RemoteRxEvent> newConnection) {
final InetSocketAddress socketAddress = (InetSocketAddress) newConnection.getChannel().remoteAddress();
// extract groupId, id, predicate from incoming byte[]
return
newConnection.getInput()
.flatMap(new Func1<RemoteRxEvent, Observable<Void>>() {
@Override
public Observable<Void> call(
RemoteRxEvent incomingRequest) {
if (incomingRequest.getType() == RemoteRxEvent.Type.subscribed) {
Map<String, String> params = incomingRequest.getSubscribeParameters();
// client state
String id = null;
String slotId = null;
String groupId = null;
// sample state
boolean enableSampling = false;
long samplingTimeMsec = 0;
String availabilityZone = null;
// predicate state
Map<String, List<String>> predicateParams = null;
if (params != null && !params.isEmpty()) {
predicateParams = new HashMap<String, List<String>>();
for (Entry<String, String> entry : params.entrySet()) {
List<String> values = new LinkedList<>();
values.add(entry.getValue());
predicateParams.put(entry.getKey(), values);
}
if (params.containsKey("id")) {
id = params.get("id");
}
if (params.containsKey("slotId")) {
slotId = params.get("slotId");
}
if (params.containsKey("groupId")) {
groupId = params.get("groupId");
}
if (params.containsKey("sample")) {
samplingTimeMsec = Long.parseLong(params.get("sample")) * 1000;
if (samplingTimeMsec < 50) {
throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
}
enableSampling = true;
}
if (params.containsKey("sampleMSec")) {
samplingTimeMsec = Long.parseLong(params.get("sampleMSec"));
if (samplingTimeMsec < 50) {
throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
}
enableSampling = true;
}
if (params.containsKey("availabilityZone") && !params.get("availabilityZone").isEmpty()) {
availabilityZone = params.get("availabilityZone");
}
}
Func1<T, Boolean> predicateFunction = null;
if (predicate != null) {
predicateFunction = predicate.call(predicateParams);
}
// support legacy metrics per connection
Metrics sseSinkMetrics = new Metrics.Builder()
.name("DropOperator_outgoing_subject_" + slotId)
.addCounter("onNext")
.addCounter("dropped")
.build();
sseSinkMetrics = metricsRegistry.registerAndGet(sseSinkMetrics);
Counter legacyMsgProcessedCounter = sseSinkMetrics.getCounter("onNext");
Counter legacyDroppedWrites = sseSinkMetrics.getCounter("dropped");
return manageConnection(newConnection, socketAddress.getHostString(), socketAddress.getPort(),
groupId, slotId, id, null,
false, null, enableSampling, samplingTimeMsec,
predicateFunction, null, legacyMsgProcessedCounter, legacyDroppedWrites,
null, availabilityZone);
}
return null;
}
});
}
})
.pipelineConfigurator(new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
// pipeline.addLast(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
pipeline.addLast("heartbeat", new HeartbeatHandler());
pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(5242880, 0, 4, 0, 4)); // max frame = half MB
}
}, new LegacyTcpPipelineConfigurator(name)))
.channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024))
.build();
return server;
}