public static void main()

in quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java [33:70]


    public static void main(String[] args) {

        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);

        StreamingContext context = new StreamingContext(envConfig);

        //读取MySQL数据源
        new JdbcJavaSource<Order>(context)
                .getDataStream(
                        (SQLQueryFunction<Order>) lastOne -> {
                            //5秒抽取一次
                            Thread.sleep(5000);

                            Serializable lastOffset = lastOne == null ? "2020-10-10 23:00:00" : lastOne.getTimestamp();

                            return String.format(
                                    "select * from t_order " +
                                            "where timestamp > '%s' " +
                                            "order by timestamp asc ",
                                    lastOffset
                            );
                        },
                        (SQLResultFunction<Order>) map -> {
                            List<Order> result = new ArrayList<>();
                            map.forEach(item -> {
                                Order order = new Order();
                                order.setOrderId(item.get("order_id").toString());
                                order.setMarketId(item.get("market_id").toString());
                                order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
                                result.add(order);
                            });
                            return result;
                        }, null)
                .returns(TypeInformation.of(Order.class));

        context.start();

    }