in modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java [51:92]
public static <T extends AbstractKafkaToIgniteCdcStreamer> T loadKafkaToIgniteStreamer(String springXmlPath)
throws IgniteCheckedException {
URL cfgUrl = U.resolveSpringUrl(springXmlPath);
IgniteSpringHelper spring = SPRING.create(false);
GridTuple3<Map<String, ?>, Map<Class<?>, Collection>, ? extends GridSpringResourceContext> cfgTuple =
spring.loadBeans(cfgUrl, F.asList(KAFKA_PROPERTIES),
IgniteConfiguration.class, ClientConfiguration.class, KafkaToIgniteCdcStreamerConfiguration.class);
Collection<IgniteConfiguration> ignCfg = cfgTuple.get2().get(IgniteConfiguration.class);
Collection<ClientConfiguration> clientCfg = cfgTuple.get2().get(ClientConfiguration.class);
if (ignCfg.isEmpty() && clientCfg.isEmpty())
throw new IgniteCheckedException("IgniteConfiguration or ClientConfiguration should be defined.");
if (!ignCfg.isEmpty() && !clientCfg.isEmpty())
throw new IgniteCheckedException("Either IgniteConfiguration or ClientConfiguration should be defined.");
if (ignCfg.size() > 1)
throw new IgniteCheckedException("Exact 1 IgniteConfiguration should be defined. Found " + ignCfg.size());
if (clientCfg.size() > 1)
throw new IgniteCheckedException("Exact 1 ClientConfiguration should be defined. Found " + clientCfg.size());
Collection<KafkaToIgniteCdcStreamerConfiguration> k2iCfg =
cfgTuple.get2().get(KafkaToIgniteCdcStreamerConfiguration.class);
if (k2iCfg.size() > 1) {
throw new IgniteCheckedException(
"Exact 1 KafkaToIgniteCdcStreamerConfiguration configuration should be defined. " +
"Found " + k2iCfg.size()
);
}
Properties kafkaProps = (Properties)cfgTuple.get1().get(KAFKA_PROPERTIES);
if (ignCfg.isEmpty())
return (T)new KafkaToIgniteClientCdcStreamer(clientCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
else
return (T)new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
}