in quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java [35:56]
public static void main(String[] args) {
StreamEnvConfig javaConfig = new StreamEnvConfig(args, (environment, parameterTool) -> {
//environment.getConfig().enableForceAvro();
System.out.println("environment argument set...");
});
StreamingContext context = new StreamingContext(javaConfig);
//1) 从 kafka 中读取数据
DataStream<Behavior> source = new KafkaJavaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, Behavior>) value -> JsonUtils.read(value, Behavior.class));
// 2) 将数据写入其他 kafka 主题
new KafkaJavaSink<Behavior>(context)
.serializer((SerializationSchema<Behavior>) element -> JsonUtils.write(element).getBytes())
.sink(source);
context.start();
}