in gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java [213:350]
public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Optional<Map<String, List<Integer>>> filteredTopicPartition, Optional<Integer> minContainer) {
this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
Map<String, List<Integer>> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>());
Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap();
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr =
state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
tableType = Extract.TableType.valueOf(tableTypeStr);
extractNamespace =
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
} else {
// To be compatible, reject table type and namespace configuration keys as previous implementation
tableType = KafkaSource.DEFAULT_TABLE_TYPE;
extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
}
isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, "");
this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
.resolveClass(state.getProp(
GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
if(filteredTopicPartition.get().isEmpty()) {
// return an empty list as filteredTopicPartition is present but contains no valid entry
return new ArrayList<>();
} else {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get()
.getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
}
} else {
// get topics based on job level config
topics = getValidTopics(getFilteredTopics(state), state);
}
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());
Map<String, State> topicSpecificStateMap =
DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {
@Override
public String apply(KafkaTopic topic) {
return topic.getName();
}
}), state);
int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
// No need to allocate more thread than the topic size, but minimum should 1
numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));
if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
}
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
LOG.info("Discovered topic {} with {} number of partitions", topic.getName(), topic.getPartitions().size());
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
Optional<Set<Integer>> partitionIDSet = Optional.absent();
if(filteredTopicPartition.isPresent()) {
List<Integer> list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
.orElse(new ArrayList<>());
partitionIDSet = Optional.of(new HashSet<>(list));
LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}",
topic.getName(), list.size());
}
threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
kafkaTopicWorkunitMap, partitionIDSet));
}
ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed). When filteredTopicPartition present, only filtered topic-partitions are needed so skip this call
if(!filteredTopicPartition.isPresent()) {
createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state);
}
KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = minContainer.or(1);
if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap));
}
addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
return workUnitList;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException("Checked exception caught", e);
} catch (Throwable t) {
throw new RuntimeException("Unexpected throwable caught, ", t);
} finally {
try {
GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get();
if (consumerClient != null) {
consumerClient.close();
}
// cleanup clients from pool
for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) {
client.close();
}
} catch (Throwable t) {
//Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be
//propagated
LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
}
}
}