in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [134:148]
private static StreamingFileSink<String> createS3Sink(ParameterTool parameter) {
log.info("Creating S3 sink from Application Properties");
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(parameter.get("s3_output_path")), new SimpleStringEncoder<String>("UTF-8"))
.withBucketCheckInterval(
TimeUnit.SECONDS.toMillis(Long.parseLong(parameter.get("bucket_check_interval_in_seconds"))))
.withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(
TimeUnit.SECONDS.toMillis(Long.parseLong(parameter.get("rolling_interval_in_seconds"))))
.withInactivityInterval(TimeUnit.SECONDS
.toMillis(Long.parseLong(parameter.get("inactivity_interval_in_seconds"))))
.build())
.build();
return sink;
}