in quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java [33:70]
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
//读取MySQL数据源
new JdbcJavaSource<Order>(context)
.getDataStream(
(SQLQueryFunction<Order>) lastOne -> {
//5秒抽取一次
Thread.sleep(5000);
Serializable lastOffset = lastOne == null ? "2020-10-10 23:00:00" : lastOne.getTimestamp();
return String.format(
"select * from t_order " +
"where timestamp > '%s' " +
"order by timestamp asc ",
lastOffset
);
},
(SQLResultFunction<Order>) map -> {
List<Order> result = new ArrayList<>();
map.forEach(item -> {
Order order = new Order();
order.setOrderId(item.get("order_id").toString());
order.setMarketId(item.get("market_id").toString());
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
result.add(order);
});
return result;
}, null)
.returns(TypeInformation.of(Order.class));
context.start();
}