public static void validateIngestionConfig()

in pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java [334:596]


  public static void validateIngestionConfig(TableConfig tableConfig, Schema schema) {
    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();

    if (ingestionConfig != null) {
      String tableNameWithType = tableConfig.getTableName();

      // Batch
      if (ingestionConfig.getBatchIngestionConfig() != null) {
        BatchIngestionConfig cfg = ingestionConfig.getBatchIngestionConfig();
        List<Map<String, String>> batchConfigMaps = cfg.getBatchConfigMaps();
        try {
          if (CollectionUtils.isNotEmpty(batchConfigMaps)) {
            // Validate that BatchConfig can be created
            batchConfigMaps.forEach(b -> new BatchConfig(tableNameWithType, b));
          }
        } catch (Exception e) {
          throw new IllegalStateException("Could not create BatchConfig using the batchConfig map", e);
        }
        if (tableConfig.isDimTable()) {
          Preconditions.checkState(cfg.getSegmentIngestionType().equalsIgnoreCase("REFRESH"),
              "Dimension tables must have segment ingestion type REFRESH");
        }
      }
      if (tableConfig.isDimTable()) {
        Preconditions.checkState(ingestionConfig.getBatchIngestionConfig() != null,
            "Dimension tables must have batch ingestion configuration");
      }

      // Stream
      // stream config map can either be in ingestion config or indexing config. cannot be in both places
      if (ingestionConfig.getStreamIngestionConfig() != null) {
        IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
        Preconditions.checkState(indexingConfig == null || MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
            "Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided");
        List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
        Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at least 1 stream in REALTIME table");
        // TODO: for multiple stream configs, validate them

        boolean isPauselessEnabled = ingestionConfig.getStreamIngestionConfig().isPauselessConsumptionEnabled();
        if (isPauselessEnabled) {
          int replication = tableConfig.getReplication();
          // We are checking for this only when replication is greater than 1 because in test environments
          // users still prefer to create pauseless tables with replication 1
          if (replication > 1) {
            String peerSegmentDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
            Preconditions.checkState(StringUtils.isNotEmpty(peerSegmentDownloadScheme) && isValidPeerDownloadScheme(
                    peerSegmentDownloadScheme),
                "Must have a valid peerSegmentDownloadScheme set in validation config for pauseless consumption");
          } else {
            LOGGER.warn("It's not recommended to create pauseless tables with replication 1 for stability reasons.");
          }
        }
      }

      // Filter config
      FilterConfig filterConfig = ingestionConfig.getFilterConfig();
      if (filterConfig != null) {
        String filterFunction = filterConfig.getFilterFunction();
        if (filterFunction != null) {
          if (_disableGroovy && FunctionEvaluatorFactory.isGroovyExpression(filterFunction)) {
            throw new IllegalStateException(
                "Groovy filter functions are disabled for table config. Found '" + filterFunction + "'");
          }
          try {
            FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
          } catch (Exception e) {
            throw new IllegalStateException(
                "Invalid filter function '" + filterFunction + "', exception: " + e.getMessage(), e);
          }
        }
      }

      // Aggregation configs
      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
      Set<String> aggregationSourceColumns = new HashSet<>();
      if (!CollectionUtils.isEmpty(aggregationConfigs)) {
        Preconditions.checkState(!tableConfig.getIndexingConfig().isAggregateMetrics(),
            "aggregateMetrics cannot be set with AggregationConfig");
        Set<String> aggregationColumns = new HashSet<>();
        for (AggregationConfig aggregationConfig : aggregationConfigs) {
          String columnName = aggregationConfig.getColumnName();
          String aggregationFunction = aggregationConfig.getAggregationFunction();
          if (columnName == null || aggregationFunction == null) {
            throw new IllegalStateException(
                "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
          }

          FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
          Preconditions.checkState(fieldSpec != null,
              "The destination column '" + columnName + "' of the aggregation function must be present in the schema");
          Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC,
              "The destination column '" + columnName + "' of the aggregation function must be a metric column");

          if (!aggregationColumns.add(columnName)) {
            throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'");
          }
          ExpressionContext expressionContext;
          try {
            expressionContext = RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
          } catch (Exception e) {
            throw new IllegalStateException(
                "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e);
          }
          Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
              "aggregation function must be a function for: %s", aggregationConfig);

          FunctionContext functionContext = expressionContext.getFunction();
          AggregationFunctionType functionType =
              AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
          validateIngestionAggregation(functionType);

          List<ExpressionContext> arguments = functionContext.getArguments();
          int numArguments = arguments.size();
          if (functionType == DISTINCTCOUNTHLL) {
            Preconditions.checkState(numArguments >= 1 && numArguments <= 2,
                "DISTINCT_COUNT_HLL can have at most two arguments: %s", aggregationConfig);
            if (numArguments == 2) {
              ExpressionContext secondArgument = arguments.get(1);
              Preconditions.checkState(secondArgument.getType() == ExpressionContext.Type.LITERAL,
                  "Second argument of DISTINCT_COUNT_HLL must be literal: %s", aggregationConfig);
              String literal = secondArgument.getLiteral().getStringValue();
              Preconditions.checkState(StringUtils.isNumeric(literal),
                  "Second argument of DISTINCT_COUNT_HLL must be a number: %s", aggregationConfig);
            }
            DataType dataType = fieldSpec.getDataType();
            Preconditions.checkState(dataType == DataType.BYTES, "Result type for DISTINCT_COUNT_HLL must be BYTES: %s",
                aggregationConfig);
          } else if (functionType == DISTINCTCOUNTHLLPLUS) {
            Preconditions.checkState(numArguments >= 1 && numArguments <= 3,
                "DISTINCT_COUNT_HLL_PLUS can have at most three arguments: %s", aggregationConfig);
            if (numArguments == 2) {
              ExpressionContext secondArgument = arguments.get(1);
              Preconditions.checkState(secondArgument.getType() == ExpressionContext.Type.LITERAL,
                  "Second argument of DISTINCT_COUNT_HLL_PLUS must be literal: %s", aggregationConfig);
              String literal = secondArgument.getLiteral().getStringValue();
              Preconditions.checkState(StringUtils.isNumeric(literal),
                  "Second argument of DISTINCT_COUNT_HLL_PLUS must be a number: %s", aggregationConfig);
            }
            if (numArguments == 3) {
              ExpressionContext thirdArgument = arguments.get(2);
              Preconditions.checkState(thirdArgument.getType() == ExpressionContext.Type.LITERAL,
                  "Third argument of DISTINCT_COUNT_HLL_PLUS must be literal: %s", aggregationConfig);
              String literal = thirdArgument.getLiteral().getStringValue();
              Preconditions.checkState(StringUtils.isNumeric(literal),
                  "Third argument of DISTINCT_COUNT_HLL_PLUS must be a number: %s", aggregationConfig);
            }
            if (fieldSpec != null) {
              DataType dataType = fieldSpec.getDataType();
              Preconditions.checkState(dataType == DataType.BYTES,
                  "Result type for DISTINCT_COUNT_HLL_PLUS must be BYTES: %s", aggregationConfig);
            }
          } else if (functionType == SUMPRECISION) {
            Preconditions.checkState(numArguments >= 2 && numArguments <= 3,
                "SUM_PRECISION must specify precision (required), scale (optional): %s", aggregationConfig);
            ExpressionContext secondArgument = arguments.get(1);
            Preconditions.checkState(secondArgument.getType() == ExpressionContext.Type.LITERAL,
                "Second argument of SUM_PRECISION must be literal: %s", aggregationConfig);
            String literal = secondArgument.getLiteral().getStringValue();
            Preconditions.checkState(StringUtils.isNumeric(literal),
                "Second argument of SUM_PRECISION must be a number: %s", aggregationConfig);
            if (fieldSpec != null) {
              DataType dataType = fieldSpec.getDataType();
              Preconditions.checkState(dataType == DataType.BIG_DECIMAL || dataType == DataType.BYTES,
                  "Result type for DISTINCT_COUNT_HLL must be BIG_DECIMAL or BYTES: %s", aggregationConfig);
            }
          } else {
            Preconditions.checkState(numArguments == 1, "%s can only have one argument: %s", functionType,
                aggregationConfig);
          }
          ExpressionContext firstArgument = arguments.get(0);
          Preconditions.checkState(firstArgument.getType() == ExpressionContext.Type.IDENTIFIER,
              "First argument of aggregation function: %s must be identifier, got: %s", functionType,
              firstArgument.getType());

          aggregationSourceColumns.add(firstArgument.getIdentifier());
        }
        Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
            "all metric columns must be aggregated");

        // This is required by MutableSegmentImpl.enableMetricsAggregationIfPossible().
        // That code will disable ingestion aggregation if all metrics aren't noDictionaryColumns.
        // But if you do that after the table is already created, all future aggregations will
        // just be the default value.
        Map<String, DictionaryIndexConfig> configPerCol = StandardIndexes.dictionary().getConfig(tableConfig, schema);
        aggregationColumns.forEach(column -> {
          DictionaryIndexConfig dictConfig = configPerCol.get(column);
          Preconditions.checkState(dictConfig != null && dictConfig.isDisabled(),
              "Aggregated column: %s must be a no-dictionary column", column);
        });
      }

      // Enrichment configs
      List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
      if (enrichmentConfigs != null) {
        for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
          RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig,
              new RecordEnricherValidationConfig(_disableGroovy));
        }
      }

      // Transform configs
      List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
      if (transformConfigs != null) {
        Set<String> transformColumns = new HashSet<>();
        for (TransformConfig transformConfig : transformConfigs) {
          String columnName = transformConfig.getColumnName();
          String transformFunction = transformConfig.getTransformFunction();
          if (columnName == null || transformFunction == null) {
            throw new IllegalStateException(
                "columnName/transformFunction cannot be null in TransformConfig " + transformConfig);
          }
          if (!transformColumns.add(columnName)) {
            throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
          }
          Preconditions.checkState(schema.hasColumn(columnName) || aggregationSourceColumns.contains(columnName),
              "The destination column '" + columnName
                  + "' of the transform function must be present in the schema or as a source column for "
                  + "aggregations");
          FunctionEvaluator expressionEvaluator;
          if (_disableGroovy && FunctionEvaluatorFactory.isGroovyExpression(transformFunction)) {
            throw new IllegalStateException(
                "Groovy transform functions are disabled for table config. Found '" + transformFunction
                    + "' for column '" + columnName + "'");
          }
          try {
            expressionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(transformFunction);
          } catch (Exception e) {
            throw new IllegalStateException(
                "Invalid transform function '" + transformFunction + "' for column '" + columnName
                    + "', exception: " + e.getMessage(), e);
          }
          List<String> arguments = expressionEvaluator.getArguments();
          if (arguments.contains(columnName)) {
            throw new IllegalStateException(
                "Arguments of a transform function '" + arguments + "' cannot contain the destination column '"
                    + columnName + "'");
          }
        }
      }

      // Complex configs
      ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
      if (complexTypeConfig != null) {
        Map<String, String> prefixesToRename = complexTypeConfig.getPrefixesToRename();
        if (MapUtils.isNotEmpty(prefixesToRename)) {
          Set<String> fieldNames = schema.getColumnNames();
          for (String prefix : prefixesToRename.keySet()) {
            for (String field : fieldNames) {
              Preconditions.checkState(!field.startsWith(prefix),
                  "Fields in the schema may not begin with any prefix specified in the prefixesToRename"
                      + " config. Name conflict with field: " + field + " and prefix: " + prefix);
            }
          }
        }
      }

      SchemaConformingTransformerConfig schemaConformingTransformerConfig =
          ingestionConfig.getSchemaConformingTransformerConfig();
      if (schemaConformingTransformerConfig != null) {
        SchemaConformingTransformer.validateSchema(schema, schemaConformingTransformerConfig);
      }
    }
  }