in metacat-main/src/main/java/com/netflix/metacat/main/services/impl/PartitionServiceImpl.java [139:216]
public List<PartitionDto> list(
final QualifiedName name,
@Nullable final Sort sort,
@Nullable final Pageable pageable,
final boolean includeUserDefinitionMetadata,
final boolean includeUserDataMetadata,
@Nullable final GetPartitionsRequestDto getPartitionsRequestDto
) {
//add table info here
// the conversion will handle getPartitionsRequestDto as null case
final PartitionListRequest partitionListRequest =
converterUtil.toPartitionListRequest(getPartitionsRequestDto, pageable, sort);
final String filterExpression = partitionListRequest.getFilter();
final List<String> partitionNames = partitionListRequest.getPartitionNames();
if (Strings.isNullOrEmpty(filterExpression)
&& (pageable == null || !pageable.isPageable())
&& (partitionNames == null || partitionNames.isEmpty())
&& config.getNamesToThrowErrorOnListPartitionsWithNoFilter().contains(name)) {
throw new IllegalArgumentException(String.format("No filter or limit specified for table %s", name));
}
final MetacatRequestContext metacatRequestContext = MetacatContextManager.getContext();
final ConnectorPartitionService service = connectorManager.getPartitionService(name);
final ConnectorRequestContext connectorRequestContext = converterUtil.toConnectorContext(metacatRequestContext);
final List<PartitionInfo> resultInfo = service
.getPartitions(connectorRequestContext, name, partitionListRequest, this.getTableInfo(name));
List<PartitionDto> result = Lists.newArrayList();
if (resultInfo != null && !resultInfo.isEmpty()) {
result = resultInfo.stream().map(converterUtil::toPartitionDto).collect(Collectors.toList());
final List<QualifiedName> names = Lists.newArrayList();
final List<String> uris = Lists.newArrayList();
final Map<String, ObjectNode> prePopulatedMap = new HashMap<>();
resultInfo.stream().filter(partitionInfo -> partitionInfo.getDataMetrics() != null)
.forEach(partitionInfo ->
prePopulatedMap.put(partitionInfo.getName().toString(), partitionInfo.getDataMetrics()));
result.forEach(partitionDto -> {
names.add(partitionDto.getName());
if (partitionDto.isDataExternal()) {
uris.add(partitionDto.getDataUri());
}
});
registry.distributionSummary(
this.partitionGetDistSummary.withTags(name.parts())).record(result.size());
log.info("Got {} partitions for {} using filter: {} and partition names: {}",
result.size(), name, filterExpression,
partitionNames);
if (includeUserDefinitionMetadata || includeUserDataMetadata) {
final List<ListenableFuture<Map<String, ObjectNode>>> futures = Lists.newArrayList();
futures.add(threadServiceManager.getExecutor().submit(() -> includeUserDefinitionMetadata
? userMetadataService.getDefinitionMetadataMap(names)
: Maps.newHashMap()));
futures.add(threadServiceManager.getExecutor().submit(() -> includeUserDataMetadata
? userMetadataService.getDataMetadataMap(uris)
: Maps.newHashMap()));
try {
final List<Map<String, ObjectNode>> metadataResults = Futures.successfulAsList(futures)
.get(1, TimeUnit.HOURS);
final Map<String, ObjectNode> definitionMetadataMap = metadataResults.get(0);
final Map<String, ObjectNode> dataMetadataMap = metadataResults.get(1);
result.forEach(partitionDto -> userMetadataService.populateMetadata(partitionDto,
definitionMetadataMap.get(partitionDto.getName().toString()),
prePopulatedMap.containsKey(partitionDto.getName().toString())
? prePopulatedMap.get(partitionDto.getName().toString()) //using the prepopulated datametric
: dataMetadataMap.get(partitionDto.getDataUri())));
} catch (Exception e) {
Throwables.propagate(e);
}
}
}
return result;
}