in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java [70:106]
public void start(Map<String, String> props) {
logger.info("Starting Apache Geode source task");
try {
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
ClientCache clientCache =
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
if (clientCache == null) {
throw new ConnectException("Unable to create client cache in the source task");
}
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
regionToTopics = geodeConnectorConfig.getRegionToTopics();
sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
String cqPrefix = geodeConnectorConfig.getCqPrefix();
boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
loadEntireRegion);
logger.info("Started Apache Geode source task");
} catch (Exception e) {
logger.error("Unable to start source task", e);
if (e instanceof ConnectException) {
throw e;
} else {
throw new ConnectException(e);
}
}
}