in quickstart-flink/quickstart-datastream/datastream_1.16/src/main/java/org/apache/streampark/flink/quickstart/datastream/QuickStartJavaApp.java [36:54]
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStream<JavaUser> source = new KafkaJavaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
JsonUtils.read(value.value(), JavaUser.class))
.filter((FilterFunction<JavaUser>) value -> value.age < 30);
new JdbcJavaSink<JavaUser>(context)
.sql((TransformFunction<JavaUser, String>) JavaUser::toSql)
.sink(source);
context.start();
}