public static void main()

in analytics/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [76:102]


    public static void main(String[] args) throws Exception {
        final ParameterTool parameter = ParameterToolUtils.fromArgsAndApplicationProperties(args);

        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final String region = parameter.get("Region", DEFAULT_REGION_NAME);
        final String databaseName = parameter.get("TimestreamDbName", DEFAULT_DB_NAME);
        final String tableName = parameter.get("TimestreamTableName", DEFAULT_TABLE_NAME);
        final int batchSize = Integer.parseInt(parameter.get("TimestreamIngestBatchSize", "75"));

        TimestreamInitializer timestreamInitializer = new TimestreamInitializer(region);
        timestreamInitializer.createDatabase(databaseName);
        timestreamInitializer.createTable(databaseName, tableName);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000L);

        createKinesisSource(env, parameter)
                .map(new JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload")
                .process(new OffsetFutureTimestreamPoints()).name("UpdateFutureOffsetedTimestreamPoints")
                .addSink(new TimestreamSink(region, databaseName, tableName, batchSize))
                .name("TimeSeries<" + databaseName + ", " + tableName + ">");

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }