in docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java [73:134]
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
configureEnvironment(params, env);
boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", "localhost:9092");
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
.setTopics(inputTopic)
.setValueOnlyDeserializer(new ClickEventDeserializationSchema())
.setProperties(kafkaProps)
.build();
WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());
DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "ClickEvent Source");
if (inflictBackpressure) {
// Force a network shuffle so that the backpressure will affect the buffer pools
clicks = clicks
.keyBy(ClickEvent::getPage)
.map(new BackpressureMap())
.name("Backpressure");
}
WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
TumblingEventTimeWindows.of(WINDOW_SIZE) :
TumblingProcessingTimeWindows.of(WINDOW_SIZE);
DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
.window(assigner)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");
statistics.sinkTo(
KafkaSink.<ClickEventStatistics>builder()
.setBootstrapServers(kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new ClickEventStatisticsSerializationSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build())
.name("ClickEventStatistics Sink");
env.execute("Click Event Count");
}