private void runQueryStrategy()

in pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java [142:322]


  private void runQueryStrategy()
      throws Exception {
    // Get all resources in cluster
    List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(_clusterName);

    for (String tableNameWithType : resourcesInCluster) {
      // Skip non-table resources
      if (!TableNameBuilder.isTableResource(tableNameWithType)) {
        continue;
      }

      // Skip tables that do not match the defined name pattern
      if (_tableNamePattern != null && !tableNameWithType.matches(_tableNamePattern)) {
        continue;
      }
      LOGGER.info("Table: {} matches the table name pattern: {}", tableNameWithType, _tableNamePattern);

      // Get the inverted index config
      TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
      Preconditions.checkNotNull(tableConfig);
      IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
      List<String> invertedIndexColumns = indexingConfig.getInvertedIndexColumns();
      boolean autoGeneratedInvertedIndex = indexingConfig.isAutoGeneratedInvertedIndex();

      // Handle auto-generated inverted index
      if (autoGeneratedInvertedIndex) {
        Preconditions.checkState(!invertedIndexColumns.isEmpty(), "Auto-generated inverted index list is empty");

        // NEW mode, skip
        if (_mode == Mode.NEW) {
          LOGGER.info(
              "Table: {}, skip adding inverted index because it has auto-generated inverted index and under NEW mode",
              tableNameWithType);
          continue;
        }

        // REMOVE mode, remove the inverted index and update
        if (_mode == Mode.REMOVE) {
          invertedIndexColumns.clear();
          indexingConfig.setAutoGeneratedInvertedIndex(false);
          if (updateIndexConfig(tableNameWithType, tableConfig)) {
            LOGGER.info("Table: {}, removed auto-generated inverted index", tableNameWithType);
          } else {
            LOGGER.error("Table: {}, failed to remove auto-generated inverted index", tableNameWithType);
          }
          continue;
        }

        // REFRESH mode, remove auto-generated inverted index
        if (_mode == Mode.REFRESH) {
          invertedIndexColumns.clear();
        }
      } else {
        // Handle null inverted index columns
        if (invertedIndexColumns == null) {
          invertedIndexColumns = new ArrayList<>();
          indexingConfig.setInvertedIndexColumns(invertedIndexColumns);
        }

        // Remove empty strings
        int emptyStringIndex;
        while ((emptyStringIndex = invertedIndexColumns.indexOf("")) != -1) {
          invertedIndexColumns.remove(emptyStringIndex);
        }

        // Skip non-empty non-auto-generated inverted index
        if (!invertedIndexColumns.isEmpty()) {
          LOGGER.info("Table: {}, skip adding inverted index because it has non-auto-generated inverted index",
              tableNameWithType);
          continue;
        }
      }

      // Skip tables without a schema
      Schema tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
      if (tableSchema == null) {
        LOGGER.info("Table: {}, skip adding inverted index because it does not have a schema", tableNameWithType);
        continue;
      }

      // Skip tables without dimensions
      List<String> dimensionNames = tableSchema.getDimensionNames();
      if (dimensionNames.isEmpty()) {
        LOGGER.info("Table: {}, skip adding inverted index because it does not have any dimension column",
            tableNameWithType);
        continue;
      }

      // Skip tables without a proper time column
      String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
      if (timeColumnName == null) {
        LOGGER.info(
            "Table: {}, skip adding inverted index because it does not have a time column specified in the table "
                + "config", tableNameWithType);
        continue;
      }
      DateTimeFieldSpec dateTimeSpec = tableSchema.getSpecForTimeColumn(timeColumnName);
      if (dateTimeSpec == null || dateTimeSpec.getDataType() == FieldSpec.DataType.STRING) {
        LOGGER.info("Table: {}, skip adding inverted index because it does not have a numeric time column",
            tableNameWithType);
        continue;
      }
      TimeUnit timeUnit = dateTimeSpec.getFormatSpec().getColumnUnit();
      if (timeUnit != TimeUnit.DAYS) {
        LOGGER.warn("Table: {}, time column {] has non-DAYS time unit: {}", timeColumnName, timeUnit);
      }

      // Only add inverted index to table larger than a threshold
      JsonNode queryResponse = sendQuery("SELECT COUNT(*) FROM " + tableNameWithType);
      long numTotalDocs = queryResponse.get("totalDocs").asLong();
      LOGGER.info("Table: {}, number of total documents: {}", tableNameWithType, numTotalDocs);
      if (numTotalDocs <= _tableSizeThreshold) {
        LOGGER.info("Table: {}, skip adding inverted index because the table is too small", tableNameWithType);
        continue;
      }

      // Get each dimension's cardinality on one timestamp's data
      queryResponse = sendQuery("SELECT Max(" + timeColumnName + ") FROM " + tableNameWithType);
      long maxTimeStamp = queryResponse.get("aggregationResults").get(0).get("value").asLong();

      LOGGER.info("Table: {}, max time column {}: {}", tableNameWithType, timeColumnName, maxTimeStamp);

      // Query DISTINCTCOUNT on all dimensions in one query might cause timeout, so query them separately
      List<ResultPair> resultPairs = new ArrayList<>();
      for (String dimensionName : dimensionNames) {
        String query =
            "SELECT DISTINCTCOUNT(" + dimensionName + ") FROM " + tableNameWithType + " WHERE " + timeColumnName + " = "
                + maxTimeStamp;
        queryResponse = sendQuery(query);
        JsonNode result = queryResponse.get("aggregationResults").get(0);
        resultPairs.add(new ResultPair(result.get("function").asText().substring("distinctCount_".length()),
            result.get("value").asLong()));
      }

      // Sort the dimensions based on their cardinalities
      Collections.sort(resultPairs);

      // Add the top dimensions into inverted index columns
      int numInvertedIndex = Math.min(_maxNumInvertedIndexAdded, resultPairs.size());
      for (int i = 0; i < numInvertedIndex; i++) {
        ResultPair resultPair = resultPairs.get(i);
        String columnName = resultPair._key;
        long cardinality = resultPair._value;
        if (cardinality > _cardinalityThreshold) {
          // Do not append inverted index if already exists
          if (!invertedIndexColumns.contains(columnName)) {
            invertedIndexColumns.add(columnName);
          }
          LOGGER.info("Table: {}, add inverted index to column {} with cardinality: {}", tableNameWithType, columnName,
              cardinality);
        } else {
          LOGGER.info("Table: {}, skip adding inverted index to column {} with cardinality: {}", tableNameWithType,
              columnName, cardinality);
          break;
        }
      }

      // Update indexing config
      if (!invertedIndexColumns.isEmpty()) {
        indexingConfig.setAutoGeneratedInvertedIndex(true);
        if (updateIndexConfig(tableNameWithType, tableConfig)) {
          LOGGER.info("Table: {}, added inverted index to columns: {}", tableNameWithType, invertedIndexColumns);
        } else {
          LOGGER.error("Table: {}, failed to add inverted index to columns: {}", tableNameWithType,
              invertedIndexColumns);
        }
      } else {
        if (autoGeneratedInvertedIndex) {
          Preconditions.checkState(_mode == Mode.REFRESH);

          // Remove existing auto-generated inverted index because no column matches all the conditions
          indexingConfig.setAutoGeneratedInvertedIndex(false);
          if (updateIndexConfig(tableNameWithType, tableConfig)) {
            LOGGER.info("Table: {}, removed auto-generated inverted index", tableNameWithType);
          } else {
            LOGGER.error("Table: {}, failed to remove auto-generated inverted index", tableNameWithType);
          }
        }
      }
    }
  }