public List getWorkunitsForFilteredPartitions()

in gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java [213:350]


  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
      Optional<Map<String, List<Integer>>> filteredTopicPartition, Optional<Integer> minContainer) {
    this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

    Map<String, List<Integer>> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>());
    Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap();
    if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
      String tableTypeStr =
          state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
      tableType = Extract.TableType.valueOf(tableTypeStr);
      extractNamespace =
          state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
    } else {
      // To be compatible, reject table type and namespace configuration keys as previous implementation
      tableType = KafkaSource.DEFAULT_TABLE_TYPE;
      extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
    }
    isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
    kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
    this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
        DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);

    try {
      Config config = ConfigUtils.propertiesToConfig(state.getProperties());
      GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
          .resolveClass(state.getProp(
              GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
              DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();

      this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));

      Collection<KafkaTopic> topics;
      if(filteredTopicPartition.isPresent()) {
        if(filteredTopicPartition.get().isEmpty()) {
          // return an empty list as filteredTopicPartition is present but contains no valid entry
          return new ArrayList<>();
        } else {
          // If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
          topics = this.kafkaConsumerClient.get()
              .getFilteredTopics(Collections.emptyList(),
                  filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
        }
      } else {
        // get topics based on job level config
        topics = getValidTopics(getFilteredTopics(state), state);
      }
      this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());

      Map<String, State> topicSpecificStateMap =
          DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {

            @Override
            public String apply(KafkaTopic topic) {
              return topic.getName();
            }
          }), state);

      int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
          ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
      // No need to allocate more thread than the topic size, but minimum should 1
      numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1);
      ExecutorService threadPool =
          Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

      if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
          ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
        this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
      } else {
        // preallocate one client per thread
        populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
      }

      Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

      for (KafkaTopic topic : topics) {
        LOG.info("Discovered topic {} with {} number of partitions", topic.getName(), topic.getPartitions().size());
        if (topic.getTopicSpecificState().isPresent()) {
          topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
              .addAllIfNotExist(topic.getTopicSpecificState().get());
        }
        Optional<Set<Integer>> partitionIDSet = Optional.absent();
        if(filteredTopicPartition.isPresent()) {
          List<Integer> list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
              .orElse(new ArrayList<>());
          partitionIDSet = Optional.of(new HashSet<>(list));
          LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}",
              topic.getName(), list.size());
        }

        threadPool.submit(
            new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
                kafkaTopicWorkunitMap, partitionIDSet));
      }

      ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
      LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(),
          createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));

      // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
      // but aren't processed). When filteredTopicPartition present, only filtered topic-partitions are needed so skip this call
      if(!filteredTopicPartition.isPresent()) {
        createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state);
      }

      KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
      int numOfMultiWorkunits = minContainer.or(1);
      if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
        numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
            calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap));
      }

      addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap);
      List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
      setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
      return workUnitList;
    } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
      throw new RuntimeException("Checked exception caught", e);
    } catch (Throwable t) {
      throw new RuntimeException("Unexpected throwable caught, ", t);
    } finally {
      try {
        GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get();
        if (consumerClient != null) {
          consumerClient.close();
        }
        // cleanup clients from pool
        for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) {
          client.close();
        }
      } catch (Throwable t) {
        //Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be
        //propagated
        LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
      }
    }

  }