public static void main()

in quickstart-flink/quickstart-apacheflink/apacheflinksql_1.19/src/main/java/JavaFlinkSqlJob.java [23:61]


    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        tableEnvironment.getConfig().set("parallelism.default", "3");
        tableEnvironment.executeSql("CREATE TABLE datagen (\n" +
                "f_sequence INT,\n" +
                "f_random INT,\n" +
                "f_random_str STRING,\n" +
                "ts AS localtimestamp,\n" +
                "WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                "'connector' = 'datagen',\n" +
                "'rows-per-second'='5',\n" +
                "'fields.f_sequence.kind'='sequence',\n" +
                "'fields.f_sequence.start'='1',\n" +
                "'fields.f_sequence.end'='100',\n" +
                "'fields.f_random.min'='1',\n" +
                "'fields.f_random.max'='100',\n" +
                "'fields.f_random_str.length'='10',\n" +
                "'number-of-rows'='100'\n" +
                ")");

        tableEnvironment.executeSql("CREATE TABLE print_table (\n" +
                "f_sequence INT,\n" +
                "f_random INT,\n" +
                "f_random_str STRING\n" +
                ") WITH (\n" +
                "'connector' = 'print'\n" +
                ")");

        StatementSet statementSet = tableEnvironment.createStatementSet();
        statementSet.addInsertSql("INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen");

        statementSet.execute().print();
    }