private List getPartitions()

in metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/DirectSqlGetPartition.java [424:559]


    private List<PartitionHolder> getPartitions(
        final String databaseName,
        final String tableName,
        @Nullable final List<String> partitionIds,
        @Nullable final String filterExpression,
        @Nullable final Sort sort,
        @Nullable final Pageable pageable,
        final boolean includePartitionDetails,
        final boolean forceDisableAudit
    ) {
        final FilterPartition filter = config.escapePartitionNameOnFilter() ? new HiveFilterPartition()
            : new FilterPartition();
        // batch exists
        final boolean isBatched = !Strings.isNullOrEmpty(filterExpression) && filterExpression.contains(FIELD_BATCHID);
        final boolean hasDateCreated =
            !Strings.isNullOrEmpty(filterExpression) && filterExpression.contains(FIELD_DATE_CREATED);
        // Handler for reading the result set
        final ResultSetExtractor<List<PartitionHolder>> handler = rs -> {
            final List<PartitionHolder> result = Lists.newArrayList();
            final QualifiedName tableQName = QualifiedName.ofTable(catalogName, databaseName, tableName);
            int noOfRows = 0;
            while (rs.next()) {
                noOfRows++;
                final String name = rs.getString("name");
                final String uri = rs.getString("uri");
                final long createdDate = rs.getLong(FIELD_DATE_CREATED);
                Map<String, String> values = null;
                if (hasDateCreated) {
                    values = Maps.newHashMap();
                    values.put(FIELD_DATE_CREATED, createdDate + "");
                }
                if (Strings.isNullOrEmpty(filterExpression)
                    || filter.evaluatePartitionExpression(filterExpression, name, uri, isBatched, values)) {
                    final Long id = rs.getLong("id");
                    final Long sdId = rs.getLong("sd_id");
                    final Long serdeId = rs.getLong("serde_id");
                    final String inputFormat = rs.getString("input_format");
                    final String outputFormat = rs.getString("output_format");
                    final String serializationLib = rs.getString("slib");
                    final StorageInfo storageInfo = new StorageInfo();
                    storageInfo.setUri(uri);
                    storageInfo.setInputFormat(inputFormat);
                    storageInfo.setOutputFormat(outputFormat);
                    storageInfo.setSerializationLib(serializationLib);
                    final AuditInfo auditInfo = new AuditInfo();
                    auditInfo.setCreatedDate(Date.from(Instant.ofEpochSecond(createdDate)));
                    auditInfo.setLastModifiedDate(Date.from(Instant.ofEpochSecond(createdDate)));

                    result.add(new PartitionHolder(id, sdId, serdeId,
                        PartitionInfo.builder().name(QualifiedName.ofPartition(catalogName,
                            databaseName, tableName, name)).auditInfo(auditInfo).serde(storageInfo).build()));
                }

                // Fail if the number of partitions exceeds the threshold limit.
                if (result.size() > config.getMaxPartitionsThreshold()) {
                    registry.counter(registry.createId(HiveMetrics.CounterHiveGetPartitionsExceedThresholdFailure
                            .getMetricName()).withTags(tableQName.parts())).increment();
                    final String message =
                        String.format("Number of partitions queried for table %s exceeded the threshold %d",
                            tableQName, config.getMaxPartitionsThreshold());
                    log.warn(message);
                    throw new IllegalArgumentException(message);
                }
            }
            registry.gauge(registry.createId(HiveMetrics.GaugePreExpressionFilterGetPartitionsCount
                .getMetricName()).withTags(tableQName.parts())).set(noOfRows);
            return result;
        };

        final List<PartitionHolder> partitions = this.getHandlerResults(
            databaseName,
            tableName,
            filterExpression,
            partitionIds,
            SQL.SQL_GET_PARTITIONS,
            handler,
            sort,
            pageable,
            forceDisableAudit
        );

        if (includePartitionDetails && !partitions.isEmpty()) {
            final List<Long> partIds = Lists.newArrayListWithCapacity(partitions.size());
            final List<Long> sdIds = Lists.newArrayListWithCapacity(partitions.size());
            final List<Long> serdeIds = Lists.newArrayListWithCapacity(partitions.size());
            for (PartitionHolder partitionHolder : partitions) {
                partIds.add(partitionHolder.getId());
                sdIds.add(partitionHolder.getSdId());
                serdeIds.add(partitionHolder.getSerdeId());
            }
            final List<ListenableFuture<Void>> futures = Lists.newArrayList();
            final Map<Long, Map<String, String>> partitionParams = Maps.newHashMap();
            futures.add(threadServiceManager.getExecutor().submit(() ->
                populateParameters(partIds, SQL.SQL_GET_PARTITION_PARAMS,
                    "part_id", partitionParams)));

            final Map<Long, Map<String, String>> sdParams = Maps.newHashMap();
            if (!sdIds.isEmpty()) {
                futures.add(threadServiceManager.getExecutor().submit(() ->
                    populateParameters(sdIds, SQL.SQL_GET_SD_PARAMS,
                        "sd_id", sdParams)));
            }
            final Map<Long, Map<String, String>> serdeParams = Maps.newHashMap();
            if (!serdeIds.isEmpty()) {
                futures.add(threadServiceManager.getExecutor().submit(() ->
                    populateParameters(serdeIds, SQL.SQL_GET_SERDE_PARAMS,
                        "serde_id", serdeParams)));
            }
            ListenableFuture<List<Void>> future = null;
            try {
                future = Futures.allAsList(futures);
                final int getPartitionsDetailsTimeout = Integer.parseInt(configuration
                    .getOrDefault(HiveConfigConstants.GET_PARTITION_DETAILS_TIMEOUT, "120"));
                future.get(getPartitionsDetailsTimeout, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                try {
                    if (future != null) {
                        future.cancel(true);
                    }
                } catch (Exception ignored) {
                    log.warn("Failed cancelling the task that gets the partition details.");
                }
                Throwables.propagate(e);
            }

            for (PartitionHolder partitionHolder : partitions) {
                partitionHolder.getPartitionInfo().setMetadata(partitionParams.get(partitionHolder.getId()));
                partitionHolder.getPartitionInfo().getSerde()
                    .setParameters(sdParams.get(partitionHolder.getSdId()));
                partitionHolder.getPartitionInfo().getSerde()
                    .setSerdeInfoParameters(serdeParams.get(partitionHolder.getSerdeId()));

            }
        }
        return partitions;
    }