public void start()

in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java [70:106]


  public void start(Map<String, String> props) {
    logger.info("Starting Apache Geode source task");
    try {
      GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
      logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
      geodeContext = new GeodeContext();
      ClientCache clientCache =
          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
              geodeConnectorConfig.getDurableClientId(),
              geodeConnectorConfig.getDurableClientTimeout(),
              geodeConnectorConfig.getSecurityClientAuthInit(),
              geodeConnectorConfig.getSecurityUserName(),
              geodeConnectorConfig.getSecurityPassword(),
              geodeConnectorConfig.usesSecurity());
      if (clientCache == null) {
        throw new ConnectException("Unable to create client cache in the source task");
      }
      batchSize = geodeConnectorConfig.getBatchSize();
      eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());

      regionToTopics = geodeConnectorConfig.getRegionToTopics();
      sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());

      String cqPrefix = geodeConnectorConfig.getCqPrefix();
      boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
      installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
          loadEntireRegion);
      logger.info("Started Apache Geode source task");
    } catch (Exception e) {
      logger.error("Unable to start source task", e);
      if (e instanceof ConnectException) {
        throw e;
      } else {
        throw new ConnectException(e);
      }
    }
  }