public static T loadKafkaToIgniteStreamer()

in modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java [51:92]


    public static <T extends AbstractKafkaToIgniteCdcStreamer> T loadKafkaToIgniteStreamer(String springXmlPath)
        throws IgniteCheckedException {
        URL cfgUrl = U.resolveSpringUrl(springXmlPath);

        IgniteSpringHelper spring = SPRING.create(false);

        GridTuple3<Map<String, ?>, Map<Class<?>, Collection>, ? extends GridSpringResourceContext> cfgTuple =
            spring.loadBeans(cfgUrl, F.asList(KAFKA_PROPERTIES),
                IgniteConfiguration.class, ClientConfiguration.class, KafkaToIgniteCdcStreamerConfiguration.class);

        Collection<IgniteConfiguration> ignCfg = cfgTuple.get2().get(IgniteConfiguration.class);
        Collection<ClientConfiguration> clientCfg = cfgTuple.get2().get(ClientConfiguration.class);

        if (ignCfg.isEmpty() && clientCfg.isEmpty())
            throw new IgniteCheckedException("IgniteConfiguration or ClientConfiguration should be defined.");

        if (!ignCfg.isEmpty() && !clientCfg.isEmpty())
            throw new IgniteCheckedException("Either IgniteConfiguration or ClientConfiguration should be defined.");

        if (ignCfg.size() > 1)
            throw new IgniteCheckedException("Exact 1 IgniteConfiguration should be defined. Found " + ignCfg.size());

        if (clientCfg.size() > 1)
            throw new IgniteCheckedException("Exact 1 ClientConfiguration should be defined. Found " + clientCfg.size());

        Collection<KafkaToIgniteCdcStreamerConfiguration> k2iCfg =
            cfgTuple.get2().get(KafkaToIgniteCdcStreamerConfiguration.class);

        if (k2iCfg.size() > 1) {
            throw new IgniteCheckedException(
                "Exact 1 KafkaToIgniteCdcStreamerConfiguration configuration should be defined. " +
                    "Found " + k2iCfg.size()
            );
        }

        Properties kafkaProps = (Properties)cfgTuple.get1().get(KAFKA_PROPERTIES);

        if (ignCfg.isEmpty())
            return (T)new KafkaToIgniteClientCdcStreamer(clientCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
        else
            return (T)new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
    }