public static void main()

in GettingStartedTable/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [38:78]


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool parameter;
        parameter = ParameterTool.fromArgs(args);

        //read the parameters from the Kinesis Analytics environment
        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        Properties flinkProperties = null;

        String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic");
        String brokers = parameter.get("brokers", "");
        String s3Path = parameter.get("s3Path", "");


        if (applicationProperties != null) {
            flinkProperties = applicationProperties.get("FlinkApplicationProperties");
        }

        if (flinkProperties != null) {
            kafkaTopic = flinkProperties.get("kafka-topic").toString();
            brokers = flinkProperties.get("brokers").toString();
            s3Path = flinkProperties.get("s3Path").toString();
        }
        LOG.info("kafkaTopic is ", kafkaTopic);
        LOG.info("brokers is ", brokers);
        LOG.info("s3Path is ", s3Path);


        //Create Properties object for the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", brokers);


        //Process stream using table API
        StreamingTableAPI.process(env, kafkaTopic, s3Path + "/tableapi", kafkaProps);

        //Process stream using sql API
        StreamingSQLAPI.process(env, kafkaTopic, s3Path + "/sqlapi", kafkaProps);
    }