in Beam/src/main/java/com/amazonaws/kinesisanalytics/beam/BasicBeamStreamingJob.java [53:86]
public static void main(String[] args) {
String[] kinesisArgs = BasicBeamStreamingJobOptionsParser.argsFromKinesisApplicationProperties(args, BEAM_APPLICATION_PROPERTIES);
BasicBeamStreamingJobOptions options = PipelineOptionsFactory.fromArgs(ArrayUtils.addAll(args, kinesisArgs)).as(BasicBeamStreamingJobOptions.class);
options.setRunner(FlinkRunner.class);
Regions region = Optional
.ofNullable(Regions.getCurrentRegion())
.map(r -> Regions.fromName(r.getName()))
.orElse(Regions.fromName(options.getAwsRegion()));
PipelineOptionsValidator.validate(BasicBeamStreamingJobOptions.class, options);
Pipeline p = Pipeline.create(options);
p
.apply("KDS source",
KinesisIO
.read()
.withStreamName(options.getInputStreamName())
.withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(region))
.withInitialPositionInStream(InitialPositionInStream.LATEST)
)
.apply("Pong transform",
ParDo.of(new PingPongFn())
)
.apply("KDS sink",
KinesisIO
.write()
.withStreamName(options.getOutputStreamName())
.withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(region))
// for this to properly balance across shards, the keys would need to be supplied dynamically
.withPartitioner(new SimpleHashPartitioner())
);
p.run().waitUntilFinish();
}