in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java [192:227]
public LineageVertex getLineageVertex() {
// enrich dataset facet with properties
Optional<KafkaDatasetFacet> kafkaDatasetFacet;
if (recordSerializer instanceof KafkaDatasetFacetProvider) {
kafkaDatasetFacet =
((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet();
if (!kafkaDatasetFacet.isPresent()) {
LOG.info("Provider did not return kafka dataset facet");
return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
}
kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
} else {
LOG.info(
"recordSerializer does not implement KafkaDatasetFacetProvider: {}",
recordSerializer);
return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
}
String namespace = LineageUtil.namespaceOf(kafkaProducerConfig);
Optional<TypeDatasetFacet> typeDatasetFacet = Optional.empty();
if (recordSerializer instanceof TypeDatasetFacetProvider) {
typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet();
}
if (typeDatasetFacet.isPresent()) {
return LineageUtil.sourceLineageVertexOf(
Collections.singleton(
LineageUtil.datasetOf(
namespace, kafkaDatasetFacet.get(), typeDatasetFacet.get())));
}
return LineageUtil.sourceLineageVertexOf(
Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get())));
}