in quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java [30:49]
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStreamSource<Entity> source = context.getJavaEnv().addSource(new MyDataSource());
//2) async高性能写入
new ClickHouseSink(context).asyncSink(source, value ->
String.format("insert into test.orders(userId, siteId) values (%d,%d)", value.userId, value.siteId)
).setParallelism(1);
//3) jdbc方式写入
/**
*
* new ClickHouseSink(context).jdbcSink(source, bean ->
* String.format("insert into test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId)
* ).setParallelism(1);
*
*/
context.start();
}