in src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java [59:81]
public void start(Map<String, String> props) {
logger.info("Starting Apache Geode sink task");
try {
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
ClientCache clientCache =
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
if (clientCache == null) {
throw new ConnectException("Unable start client cache in the sink task");
}
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
if (e instanceof ConnectException) {
throw e;
}
throw new ConnectException("Unable to start sink task", e);
}
}