mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java (240 lines of code) (raw):

/* * Copyright 2019 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.mantisrx.runtime.sink; import io.mantisrx.common.properties.MantisPropertiesLoader; import io.mantisrx.runtime.Context; import io.mantisrx.runtime.Metadata; import io.mantisrx.runtime.PortRequest; import io.mantisrx.runtime.sink.predicate.Predicate; import io.mantisrx.server.core.ServiceRegistry; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOption; import io.netty.channel.WriteBufferWaterMark; import io.reactivex.mantis.network.push.PushServerSse; import io.reactivex.mantis.network.push.PushServers; import io.reactivex.mantis.network.push.Routers; import io.reactivex.mantis.network.push.ServerConfig; import io.reactivex.mantis.network.push.Router; import java.io.IOException; import java.util.List; import java.util.Map; import mantis.io.reactivex.netty.RxNetty; import mantis.io.reactivex.netty.pipeline.PipelineConfigurators; import mantis.io.reactivex.netty.protocol.http.server.HttpServer; import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.functions.Func1; import rx.functions.Func2; import rx.subjects.BehaviorSubject; public class ServerSentEventsSink<T> implements SelfDocumentingSink<T> { private static final Logger LOG = LoggerFactory.getLogger(ServerSentEventsSink.class); private final Func2<Map<String, List<String>>, Context, Void> subscribeProcessor; private final BehaviorSubject<Integer> portObservable = BehaviorSubject.create(); private final Func1<T, String> encoder; private final Func1<Throwable, String> errorEncoder; private final Predicate<T> predicate; private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor; private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor; private int port = -1; private final MantisPropertiesLoader propService; private final Router<T> router; private PushServerSse<T, Context> pushServerSse; private HttpServer<ByteBuf, ServerSentEvent> httpServer; public ServerSentEventsSink(Func1<T, String> encoder) { this(encoder, null, null); } ServerSentEventsSink(Func1<T, String> encoder, Func1<Throwable, String> errorEncoder, Predicate<T> predicate) { if (errorEncoder == null) { // default errorEncoder = Throwable::getMessage; } this.encoder = encoder; this.errorEncoder = errorEncoder; this.predicate = predicate; this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); this.subscribeProcessor = null; this.router = null; } ServerSentEventsSink(Builder<T> builder) { this.encoder = builder.encoder; this.errorEncoder = builder.errorEncoder; this.predicate = builder.predicate; this.requestPreprocessor = builder.requestPreprocessor; this.requestPostprocessor = builder.requestPostprocessor; this.subscribeProcessor = builder.subscribeProcessor; this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); this.router = builder.router; } @Override public Metadata metadata() { StringBuilder description = new StringBuilder(); description.append("HTTP server streaming results using Server-sent events. The sink" + " supports optional subscription (GET) parameters to change the events emitted" + " by the stream. A sampling interval can be applied to the stream using" + " the GET parameter sample=numSeconds. This will limit the stream rate to" + " events-per-numSeconds."); if (predicate != null && predicate.getDescription() != null) { description.append(" Predicate description: ").append(predicate.getDescription()); } return new Metadata.Builder() .name("Server Sent Event Sink") .description(description.toString()) .build(); } private boolean runNewSseServerImpl(String jobName) { String legacyServerString = propService.getStringValue("mantis.sse.newServerImpl", "true"); String legacyServerStringPerJob = propService.getStringValue(jobName + ".mantis.sse.newServerImpl", "false"); return Boolean.parseBoolean(legacyServerString) || Boolean.parseBoolean(legacyServerStringPerJob); } private int numConsumerThreads() { String consumerThreadsString = propService.getStringValue("mantis.sse.numConsumerThreads", "1"); return Integer.parseInt(consumerThreadsString); } private int maxChunkSize() { String maxChunkSize = propService.getStringValue("mantis.sse.maxChunkSize", "1000"); return Integer.parseInt(maxChunkSize); } private int maxReadTime() { String maxChunkSize = propService.getStringValue("mantis.sse.maxReadTimeMSec", "250"); return Integer.parseInt(maxChunkSize); } private int maxNotWritableTimeSec() { String maxNotWritableTimeSec = propService.getStringValue("mantis.sse.maxNotWritableTimeSec", "-1"); return Integer.parseInt(maxNotWritableTimeSec); } private int bufferCapacity() { String bufferCapacityString = propService.getStringValue("mantis.sse.bufferCapacity", "25000"); return Integer.parseInt(bufferCapacityString); } private boolean useSpsc() { String useSpsc = propService.getStringValue("mantis.sse.spsc", "false"); return Boolean.parseBoolean(useSpsc); } @Override public void call(Context context, PortRequest portRequest, final Observable<T> observable) { port = portRequest.getPort(); if (runNewSseServerImpl(context.getWorkerInfo().getJobClusterName())) { LOG.info("Serving modern HTTP SSE server sink on port: " + port); String serverName = "SseSink"; ServerConfig.Builder<T> config = new ServerConfig.Builder<T>() .name(serverName) .groupRouter(router != null ? router : Routers.roundRobinSse(serverName, encoder)) .port(port) .metricsRegistry(context.getMetricsRegistry()) .maxChunkTimeMSec(maxReadTime()) .maxChunkSize(maxChunkSize()) .bufferCapacity(bufferCapacity()) .numQueueConsumers(numConsumerThreads()) .useSpscQueue(useSpsc()) .maxChunkTimeMSec(getBatchInterval()) .maxNotWritableTimeSec(maxNotWritableTimeSec()); if (predicate != null) { config.predicate(predicate.getPredicate()); } pushServerSse = PushServers.infiniteStreamSse(config.build(), observable, requestPreprocessor, requestPostprocessor, subscribeProcessor, context, true); pushServerSse.start(); } else { LOG.info("Serving legacy HTTP SSE server sink on port: " + port); int batchInterval = getBatchInterval(); httpServer = RxNetty.newHttpServerBuilder( port, new ServerSentEventRequestHandler<>( observable, encoder, errorEncoder, predicate, requestPreprocessor, requestPostprocessor, context, batchInterval)) .pipelineConfigurator(PipelineConfigurators.<ByteBuf>serveSseConfigurator()) .channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024)) .build(); httpServer.start(); } portObservable.onNext(port); } @Override public void close() throws IOException { if (pushServerSse != null) { pushServerSse.shutdown(); } else if (httpServer != null) { try { httpServer.shutdown(); } catch (InterruptedException e) { throw new IOException(String.format("Failed to shut down the http server %s", httpServer), e); } } } private int getBatchInterval() { //default flush interval String flushIntervalMillisStr = ServiceRegistry.INSTANCE.getPropertiesService() .getStringValue("mantis.sse.batchInterval", "100"); LOG.info("Read fast property mantis.sse.batchInterval" + flushIntervalMillisStr); return Integer.parseInt(flushIntervalMillisStr); } private int getHighWaterMark() { String jobName = propService.getStringValue("JOB_NAME", "default"); int highWaterMark = 5 * 1024 * 1024; String highWaterMarkStr = propService.getStringValue( jobName + ".sse.highwater.mark", Integer.toString(5 * 1024 * 1024)); LOG.info("Read fast property:" + jobName + ".sse.highwater.mark ->" + highWaterMarkStr); try { highWaterMark = Integer.parseInt(highWaterMarkStr); } catch (Exception e) { LOG.error("Error parsing string " + highWaterMarkStr + " exception " + e.getMessage()); } return highWaterMark; } public int getServerPort() { return port; } /** * Notifies you when the mantis job is available to listen to, for use when you want to * write unit or regressions tests with the local runner that verify the output. */ public Observable<Integer> portConnections() { return portObservable; } public static class Builder<T> { private Func1<T, String> encoder; private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor; private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor; private Func1<Throwable, String> errorEncoder = Throwable::getMessage; private Predicate<T> predicate; private Func2<Map<String, List<String>>, Context, Void> subscribeProcessor; private Router<T> router; public Builder<T> withEncoder(Func1<T, String> encoder) { this.encoder = encoder; return this; } public Builder<T> withErrorEncoder(Func1<Throwable, String> errorEncoder) { this.errorEncoder = errorEncoder; return this; } public Builder<T> withPredicate(Predicate<T> predicate) { this.predicate = predicate; return this; } public Builder<T> withRequestPreprocessor(Func2<Map<String, List<String>>, Context, Void> preProcessor) { this.requestPreprocessor = preProcessor; return this; } public Builder<T> withSubscribePreprocessor( Func2<Map<String, List<String>>, Context, Void> subscribeProcessor) { this.subscribeProcessor = subscribeProcessor; return this; } public Builder<T> withRequestPostprocessor(Func2<Map<String, List<String>>, Context, Void> postProcessor) { this.requestPostprocessor = postProcessor; return this; } public Builder<T> withRouter(Router<T> router) { this.router = router; return this; } public ServerSentEventsSink<T> build() { return new ServerSentEventsSink<>(this); } } }