in HudiConnector/src/main/java/basic/application/StreamingJob.java [49:84]
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameter;
parameter = ParameterTool.fromArgs(args);
//read the parameters from the Kinesis Analytics environment
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = null;
String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic");
String brokers = parameter.get("brokers", "");
String s3Path = parameter.get("s3Path", "");
if (applicationProperties != null) {
flinkProperties = applicationProperties.get("FlinkApplicationProperties");
}
if (flinkProperties != null) {
kafkaTopic = flinkProperties.get("kafka-topic").toString();
brokers = flinkProperties.get("brokers").toString();
s3Path = flinkProperties.get("s3Path").toString();
}
LOG.info("kafkaTopic is :" + kafkaTopic);
LOG.info("brokers is :" + brokers);
LOG.info("s3Path is :" + s3Path);
//Create Properties object for the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
//Process stream using sql API
StreamingSQLAPI.process(env, kafkaTopic, s3Path , kafkaProps);
}