public static void main()

in docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java [73:135]


	public static void main(String[] args) throws Exception {
		final ParameterTool params = ParameterTool.fromArgs(args);

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		configureEnvironment(params, env);

		boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);

		String inputTopic = params.get("input-topic", "input");
		String outputTopic = params.get("output-topic", "output");
		String brokers = params.get("bootstrap.servers", "localhost:9092");
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");

		KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
				.setTopics(inputTopic)
				.setValueOnlyDeserializer(new ClickEventDeserializationSchema())
				.setProperties(kafkaProps)
				.build();

		WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
				.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
				.withIdleness(Duration.ofSeconds(5))
				.withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime());

		DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "ClickEvent Source");

		if (inflictBackpressure) {
			// Force a network shuffle so that the backpressure will affect the buffer pools
			clicks = clicks
				.keyBy(ClickEvent::getPage)
				.map(new BackpressureMap())
				.name("Backpressure");
		}

		WindowAssigner<Object, TimeWindow> assigner = params.has(EVENT_TIME_OPTION) ?
				TumblingEventTimeWindows.of(WINDOW_SIZE) :
				TumblingProcessingTimeWindows.of(WINDOW_SIZE);

		DataStream<ClickEventStatistics> statistics = clicks
			.keyBy(ClickEvent::getPage)
			.window(assigner)
			.aggregate(new CountingAggregator(),
				new ClickEventStatisticsCollector())
			.name("ClickEvent Counter");

		statistics.sinkTo(
				KafkaSink.<ClickEventStatistics>builder()
						.setBootstrapServers(kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
						.setKafkaProducerConfig(kafkaProps)
						.setRecordSerializer(
								KafkaRecordSerializationSchema.builder()
										.setTopic(outputTopic)
										.setValueSerializationSchema(new ClickEventStatisticsSerializationSchema())
										.build())
						.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
						.build())
				.name("ClickEventStatistics Sink");

		env.execute("Click Event Count");
	}