public LineageVertex getLineageVertex()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java [192:227]


    public LineageVertex getLineageVertex() {
        // enrich dataset facet with properties
        Optional<KafkaDatasetFacet> kafkaDatasetFacet;
        if (recordSerializer instanceof KafkaDatasetFacetProvider) {
            kafkaDatasetFacet =
                    ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet();

            if (!kafkaDatasetFacet.isPresent()) {
                LOG.info("Provider did not return kafka dataset facet");
                return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
            }
            kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
        } else {
            LOG.info(
                    "recordSerializer does not implement KafkaDatasetFacetProvider: {}",
                    recordSerializer);
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }

        String namespace = LineageUtil.namespaceOf(kafkaProducerConfig);

        Optional<TypeDatasetFacet> typeDatasetFacet = Optional.empty();
        if (recordSerializer instanceof TypeDatasetFacetProvider) {
            typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet();
        }

        if (typeDatasetFacet.isPresent()) {
            return LineageUtil.sourceLineageVertexOf(
                    Collections.singleton(
                            LineageUtil.datasetOf(
                                    namespace, kafkaDatasetFacet.get(), typeDatasetFacet.get())));
        }

        return LineageUtil.sourceLineageVertexOf(
                Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get())));
    }