private static StreamingFileSink createS3Sink()

in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [134:148]


	private static StreamingFileSink<String> createS3Sink(ParameterTool parameter) {
		log.info("Creating S3 sink from Application Properties");
		final StreamingFileSink<String> sink = StreamingFileSink
				.forRowFormat(new Path(parameter.get("s3_output_path")), new SimpleStringEncoder<String>("UTF-8"))
				.withBucketCheckInterval(
						TimeUnit.SECONDS.toMillis(Long.parseLong(parameter.get("bucket_check_interval_in_seconds"))))
				.withRollingPolicy(DefaultRollingPolicy.create()
						.withRolloverInterval(
								TimeUnit.SECONDS.toMillis(Long.parseLong(parameter.get("rolling_interval_in_seconds"))))
						.withInactivityInterval(TimeUnit.SECONDS
								.toMillis(Long.parseLong(parameter.get("inactivity_interval_in_seconds"))))
						.build())
				.build();
		return sink;
	}