in demo-apps/demo-kinesis-driver/src/main/java/com/amazonaws/services/kinesisanalytics/DemoKinesisDriver.java [69:101]
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameter;
if (env instanceof LocalStreamEnvironment) {
//read the parameters specified from the command line
parameter = ParameterTool.fromArgs(args);
} else {
//read the parameters from the Kinesis Analytics environment
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");
if (flinkProperties == null) {
throw new RuntimeException("Unable to load FlinkApplicationProperties properties from the Kinesis Analytics Runtime.");
}
parameter = ParameterToolUtils.fromApplicationProperties(flinkProperties);
}
// use mock source to generate data
DataStream<EmployeeInfo> employeeStream = env.addSource(new RandEmployeeInfoSource());
// create and get Kinesis sink
FlinkKinesisProducer<EmployeeInfo> kinesisSink = createKinesisSink(env, parameter);
employeeStream.addSink(kinesisSink);
// execute program
env.execute("Demo Kinesis Driver");
}