in GettingStartedTable/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [38:78]
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameter;
parameter = ParameterTool.fromArgs(args);
//read the parameters from the Kinesis Analytics environment
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = null;
String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic");
String brokers = parameter.get("brokers", "");
String s3Path = parameter.get("s3Path", "");
if (applicationProperties != null) {
flinkProperties = applicationProperties.get("FlinkApplicationProperties");
}
if (flinkProperties != null) {
kafkaTopic = flinkProperties.get("kafka-topic").toString();
brokers = flinkProperties.get("brokers").toString();
s3Path = flinkProperties.get("s3Path").toString();
}
LOG.info("kafkaTopic is ", kafkaTopic);
LOG.info("brokers is ", brokers);
LOG.info("s3Path is ", s3Path);
//Create Properties object for the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
//Process stream using table API
StreamingTableAPI.process(env, kafkaTopic, s3Path + "/tableapi", kafkaProps);
//Process stream using sql API
StreamingSQLAPI.process(env, kafkaTopic, s3Path + "/sqlapi", kafkaProps);
}