public static void main()

in Beam/src/main/java/com/amazonaws/kinesisanalytics/beam/BasicBeamStreamingJob.java [53:86]


    public static void main(String[] args) {
        String[] kinesisArgs = BasicBeamStreamingJobOptionsParser.argsFromKinesisApplicationProperties(args, BEAM_APPLICATION_PROPERTIES);
        BasicBeamStreamingJobOptions options = PipelineOptionsFactory.fromArgs(ArrayUtils.addAll(args, kinesisArgs)).as(BasicBeamStreamingJobOptions.class);
        options.setRunner(FlinkRunner.class);
        Regions region = Optional
                .ofNullable(Regions.getCurrentRegion())
                .map(r -> Regions.fromName(r.getName()))
                .orElse(Regions.fromName(options.getAwsRegion()));

        PipelineOptionsValidator.validate(BasicBeamStreamingJobOptions.class, options);
        Pipeline p = Pipeline.create(options);

        p
        .apply("KDS source",
            KinesisIO
                .read()
                .withStreamName(options.getInputStreamName())
                .withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(region))
                .withInitialPositionInStream(InitialPositionInStream.LATEST)
        )
        .apply("Pong transform",
            ParDo.of(new PingPongFn())
        )
        .apply("KDS sink",
            KinesisIO
                .write()
                .withStreamName(options.getOutputStreamName())
                .withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(region))
                // for this to properly balance across shards, the keys would need to be supplied dynamically
                .withPartitioner(new SimpleHashPartitioner())
        );

        p.run().waitUntilFinish();
    }