in quickstart-flink/quickstart-apacheflink/apacheflinksql_1.19/src/main/java/JavaFlinkSqlJob.java [23:61]
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.getConfig().set("parallelism.default", "3");
tableEnvironment.executeSql("CREATE TABLE datagen (\n" +
"f_sequence INT,\n" +
"f_random INT,\n" +
"f_random_str STRING,\n" +
"ts AS localtimestamp,\n" +
"WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
"'connector' = 'datagen',\n" +
"'rows-per-second'='5',\n" +
"'fields.f_sequence.kind'='sequence',\n" +
"'fields.f_sequence.start'='1',\n" +
"'fields.f_sequence.end'='100',\n" +
"'fields.f_random.min'='1',\n" +
"'fields.f_random.max'='100',\n" +
"'fields.f_random_str.length'='10',\n" +
"'number-of-rows'='100'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE print_table (\n" +
"f_sequence INT,\n" +
"f_random INT,\n" +
"f_random_str STRING\n" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
StatementSet statementSet = tableEnvironment.createStatementSet();
statementSet.addInsertSql("INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen");
statementSet.execute().print();
}