in metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/DirectSqlGetPartition.java [424:559]
private List<PartitionHolder> getPartitions(
final String databaseName,
final String tableName,
@Nullable final List<String> partitionIds,
@Nullable final String filterExpression,
@Nullable final Sort sort,
@Nullable final Pageable pageable,
final boolean includePartitionDetails,
final boolean forceDisableAudit
) {
final FilterPartition filter = config.escapePartitionNameOnFilter() ? new HiveFilterPartition()
: new FilterPartition();
// batch exists
final boolean isBatched = !Strings.isNullOrEmpty(filterExpression) && filterExpression.contains(FIELD_BATCHID);
final boolean hasDateCreated =
!Strings.isNullOrEmpty(filterExpression) && filterExpression.contains(FIELD_DATE_CREATED);
// Handler for reading the result set
final ResultSetExtractor<List<PartitionHolder>> handler = rs -> {
final List<PartitionHolder> result = Lists.newArrayList();
final QualifiedName tableQName = QualifiedName.ofTable(catalogName, databaseName, tableName);
int noOfRows = 0;
while (rs.next()) {
noOfRows++;
final String name = rs.getString("name");
final String uri = rs.getString("uri");
final long createdDate = rs.getLong(FIELD_DATE_CREATED);
Map<String, String> values = null;
if (hasDateCreated) {
values = Maps.newHashMap();
values.put(FIELD_DATE_CREATED, createdDate + "");
}
if (Strings.isNullOrEmpty(filterExpression)
|| filter.evaluatePartitionExpression(filterExpression, name, uri, isBatched, values)) {
final Long id = rs.getLong("id");
final Long sdId = rs.getLong("sd_id");
final Long serdeId = rs.getLong("serde_id");
final String inputFormat = rs.getString("input_format");
final String outputFormat = rs.getString("output_format");
final String serializationLib = rs.getString("slib");
final StorageInfo storageInfo = new StorageInfo();
storageInfo.setUri(uri);
storageInfo.setInputFormat(inputFormat);
storageInfo.setOutputFormat(outputFormat);
storageInfo.setSerializationLib(serializationLib);
final AuditInfo auditInfo = new AuditInfo();
auditInfo.setCreatedDate(Date.from(Instant.ofEpochSecond(createdDate)));
auditInfo.setLastModifiedDate(Date.from(Instant.ofEpochSecond(createdDate)));
result.add(new PartitionHolder(id, sdId, serdeId,
PartitionInfo.builder().name(QualifiedName.ofPartition(catalogName,
databaseName, tableName, name)).auditInfo(auditInfo).serde(storageInfo).build()));
}
// Fail if the number of partitions exceeds the threshold limit.
if (result.size() > config.getMaxPartitionsThreshold()) {
registry.counter(registry.createId(HiveMetrics.CounterHiveGetPartitionsExceedThresholdFailure
.getMetricName()).withTags(tableQName.parts())).increment();
final String message =
String.format("Number of partitions queried for table %s exceeded the threshold %d",
tableQName, config.getMaxPartitionsThreshold());
log.warn(message);
throw new IllegalArgumentException(message);
}
}
registry.gauge(registry.createId(HiveMetrics.GaugePreExpressionFilterGetPartitionsCount
.getMetricName()).withTags(tableQName.parts())).set(noOfRows);
return result;
};
final List<PartitionHolder> partitions = this.getHandlerResults(
databaseName,
tableName,
filterExpression,
partitionIds,
SQL.SQL_GET_PARTITIONS,
handler,
sort,
pageable,
forceDisableAudit
);
if (includePartitionDetails && !partitions.isEmpty()) {
final List<Long> partIds = Lists.newArrayListWithCapacity(partitions.size());
final List<Long> sdIds = Lists.newArrayListWithCapacity(partitions.size());
final List<Long> serdeIds = Lists.newArrayListWithCapacity(partitions.size());
for (PartitionHolder partitionHolder : partitions) {
partIds.add(partitionHolder.getId());
sdIds.add(partitionHolder.getSdId());
serdeIds.add(partitionHolder.getSerdeId());
}
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
final Map<Long, Map<String, String>> partitionParams = Maps.newHashMap();
futures.add(threadServiceManager.getExecutor().submit(() ->
populateParameters(partIds, SQL.SQL_GET_PARTITION_PARAMS,
"part_id", partitionParams)));
final Map<Long, Map<String, String>> sdParams = Maps.newHashMap();
if (!sdIds.isEmpty()) {
futures.add(threadServiceManager.getExecutor().submit(() ->
populateParameters(sdIds, SQL.SQL_GET_SD_PARAMS,
"sd_id", sdParams)));
}
final Map<Long, Map<String, String>> serdeParams = Maps.newHashMap();
if (!serdeIds.isEmpty()) {
futures.add(threadServiceManager.getExecutor().submit(() ->
populateParameters(serdeIds, SQL.SQL_GET_SERDE_PARAMS,
"serde_id", serdeParams)));
}
ListenableFuture<List<Void>> future = null;
try {
future = Futures.allAsList(futures);
final int getPartitionsDetailsTimeout = Integer.parseInt(configuration
.getOrDefault(HiveConfigConstants.GET_PARTITION_DETAILS_TIMEOUT, "120"));
future.get(getPartitionsDetailsTimeout, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
try {
if (future != null) {
future.cancel(true);
}
} catch (Exception ignored) {
log.warn("Failed cancelling the task that gets the partition details.");
}
Throwables.propagate(e);
}
for (PartitionHolder partitionHolder : partitions) {
partitionHolder.getPartitionInfo().setMetadata(partitionParams.get(partitionHolder.getId()));
partitionHolder.getPartitionInfo().getSerde()
.setParameters(sdParams.get(partitionHolder.getSdId()));
partitionHolder.getPartitionInfo().getSerde()
.setSerdeInfoParameters(serdeParams.get(partitionHolder.getSerdeId()));
}
}
return partitions;
}