in pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java [334:596]
public static void validateIngestionConfig(TableConfig tableConfig, Schema schema) {
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
if (ingestionConfig != null) {
String tableNameWithType = tableConfig.getTableName();
// Batch
if (ingestionConfig.getBatchIngestionConfig() != null) {
BatchIngestionConfig cfg = ingestionConfig.getBatchIngestionConfig();
List<Map<String, String>> batchConfigMaps = cfg.getBatchConfigMaps();
try {
if (CollectionUtils.isNotEmpty(batchConfigMaps)) {
// Validate that BatchConfig can be created
batchConfigMaps.forEach(b -> new BatchConfig(tableNameWithType, b));
}
} catch (Exception e) {
throw new IllegalStateException("Could not create BatchConfig using the batchConfig map", e);
}
if (tableConfig.isDimTable()) {
Preconditions.checkState(cfg.getSegmentIngestionType().equalsIgnoreCase("REFRESH"),
"Dimension tables must have segment ingestion type REFRESH");
}
}
if (tableConfig.isDimTable()) {
Preconditions.checkState(ingestionConfig.getBatchIngestionConfig() != null,
"Dimension tables must have batch ingestion configuration");
}
// Stream
// stream config map can either be in ingestion config or indexing config. cannot be in both places
if (ingestionConfig.getStreamIngestionConfig() != null) {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
Preconditions.checkState(indexingConfig == null || MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
"Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided");
List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at least 1 stream in REALTIME table");
// TODO: for multiple stream configs, validate them
boolean isPauselessEnabled = ingestionConfig.getStreamIngestionConfig().isPauselessConsumptionEnabled();
if (isPauselessEnabled) {
int replication = tableConfig.getReplication();
// We are checking for this only when replication is greater than 1 because in test environments
// users still prefer to create pauseless tables with replication 1
if (replication > 1) {
String peerSegmentDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
Preconditions.checkState(StringUtils.isNotEmpty(peerSegmentDownloadScheme) && isValidPeerDownloadScheme(
peerSegmentDownloadScheme),
"Must have a valid peerSegmentDownloadScheme set in validation config for pauseless consumption");
} else {
LOGGER.warn("It's not recommended to create pauseless tables with replication 1 for stability reasons.");
}
}
}
// Filter config
FilterConfig filterConfig = ingestionConfig.getFilterConfig();
if (filterConfig != null) {
String filterFunction = filterConfig.getFilterFunction();
if (filterFunction != null) {
if (_disableGroovy && FunctionEvaluatorFactory.isGroovyExpression(filterFunction)) {
throw new IllegalStateException(
"Groovy filter functions are disabled for table config. Found '" + filterFunction + "'");
}
try {
FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction);
} catch (Exception e) {
throw new IllegalStateException(
"Invalid filter function '" + filterFunction + "', exception: " + e.getMessage(), e);
}
}
}
// Aggregation configs
List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
Set<String> aggregationSourceColumns = new HashSet<>();
if (!CollectionUtils.isEmpty(aggregationConfigs)) {
Preconditions.checkState(!tableConfig.getIndexingConfig().isAggregateMetrics(),
"aggregateMetrics cannot be set with AggregationConfig");
Set<String> aggregationColumns = new HashSet<>();
for (AggregationConfig aggregationConfig : aggregationConfigs) {
String columnName = aggregationConfig.getColumnName();
String aggregationFunction = aggregationConfig.getAggregationFunction();
if (columnName == null || aggregationFunction == null) {
throw new IllegalStateException(
"columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
}
FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
Preconditions.checkState(fieldSpec != null,
"The destination column '" + columnName + "' of the aggregation function must be present in the schema");
Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC,
"The destination column '" + columnName + "' of the aggregation function must be a metric column");
if (!aggregationColumns.add(columnName)) {
throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'");
}
ExpressionContext expressionContext;
try {
expressionContext = RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
} catch (Exception e) {
throw new IllegalStateException(
"Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e);
}
Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
"aggregation function must be a function for: %s", aggregationConfig);
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
validateIngestionAggregation(functionType);
List<ExpressionContext> arguments = functionContext.getArguments();
int numArguments = arguments.size();
if (functionType == DISTINCTCOUNTHLL) {
Preconditions.checkState(numArguments >= 1 && numArguments <= 2,
"DISTINCT_COUNT_HLL can have at most two arguments: %s", aggregationConfig);
if (numArguments == 2) {
ExpressionContext secondArgument = arguments.get(1);
Preconditions.checkState(secondArgument.getType() == ExpressionContext.Type.LITERAL,
"Second argument of DISTINCT_COUNT_HLL must be literal: %s", aggregationConfig);
String literal = secondArgument.getLiteral().getStringValue();
Preconditions.checkState(StringUtils.isNumeric(literal),
"Second argument of DISTINCT_COUNT_HLL must be a number: %s", aggregationConfig);
}
DataType dataType = fieldSpec.getDataType();
Preconditions.checkState(dataType == DataType.BYTES, "Result type for DISTINCT_COUNT_HLL must be BYTES: %s",
aggregationConfig);
} else if (functionType == DISTINCTCOUNTHLLPLUS) {
Preconditions.checkState(numArguments >= 1 && numArguments <= 3,
"DISTINCT_COUNT_HLL_PLUS can have at most three arguments: %s", aggregationConfig);
if (numArguments == 2) {
ExpressionContext secondArgument = arguments.get(1);
Preconditions.checkState(secondArgument.getType() == ExpressionContext.Type.LITERAL,
"Second argument of DISTINCT_COUNT_HLL_PLUS must be literal: %s", aggregationConfig);
String literal = secondArgument.getLiteral().getStringValue();
Preconditions.checkState(StringUtils.isNumeric(literal),
"Second argument of DISTINCT_COUNT_HLL_PLUS must be a number: %s", aggregationConfig);
}
if (numArguments == 3) {
ExpressionContext thirdArgument = arguments.get(2);
Preconditions.checkState(thirdArgument.getType() == ExpressionContext.Type.LITERAL,
"Third argument of DISTINCT_COUNT_HLL_PLUS must be literal: %s", aggregationConfig);
String literal = thirdArgument.getLiteral().getStringValue();
Preconditions.checkState(StringUtils.isNumeric(literal),
"Third argument of DISTINCT_COUNT_HLL_PLUS must be a number: %s", aggregationConfig);
}
if (fieldSpec != null) {
DataType dataType = fieldSpec.getDataType();
Preconditions.checkState(dataType == DataType.BYTES,
"Result type for DISTINCT_COUNT_HLL_PLUS must be BYTES: %s", aggregationConfig);
}
} else if (functionType == SUMPRECISION) {
Preconditions.checkState(numArguments >= 2 && numArguments <= 3,
"SUM_PRECISION must specify precision (required), scale (optional): %s", aggregationConfig);
ExpressionContext secondArgument = arguments.get(1);
Preconditions.checkState(secondArgument.getType() == ExpressionContext.Type.LITERAL,
"Second argument of SUM_PRECISION must be literal: %s", aggregationConfig);
String literal = secondArgument.getLiteral().getStringValue();
Preconditions.checkState(StringUtils.isNumeric(literal),
"Second argument of SUM_PRECISION must be a number: %s", aggregationConfig);
if (fieldSpec != null) {
DataType dataType = fieldSpec.getDataType();
Preconditions.checkState(dataType == DataType.BIG_DECIMAL || dataType == DataType.BYTES,
"Result type for DISTINCT_COUNT_HLL must be BIG_DECIMAL or BYTES: %s", aggregationConfig);
}
} else {
Preconditions.checkState(numArguments == 1, "%s can only have one argument: %s", functionType,
aggregationConfig);
}
ExpressionContext firstArgument = arguments.get(0);
Preconditions.checkState(firstArgument.getType() == ExpressionContext.Type.IDENTIFIER,
"First argument of aggregation function: %s must be identifier, got: %s", functionType,
firstArgument.getType());
aggregationSourceColumns.add(firstArgument.getIdentifier());
}
Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
"all metric columns must be aggregated");
// This is required by MutableSegmentImpl.enableMetricsAggregationIfPossible().
// That code will disable ingestion aggregation if all metrics aren't noDictionaryColumns.
// But if you do that after the table is already created, all future aggregations will
// just be the default value.
Map<String, DictionaryIndexConfig> configPerCol = StandardIndexes.dictionary().getConfig(tableConfig, schema);
aggregationColumns.forEach(column -> {
DictionaryIndexConfig dictConfig = configPerCol.get(column);
Preconditions.checkState(dictConfig != null && dictConfig.isDisabled(),
"Aggregated column: %s must be a no-dictionary column", column);
});
}
// Enrichment configs
List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
if (enrichmentConfigs != null) {
for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig,
new RecordEnricherValidationConfig(_disableGroovy));
}
}
// Transform configs
List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
if (transformConfigs != null) {
Set<String> transformColumns = new HashSet<>();
for (TransformConfig transformConfig : transformConfigs) {
String columnName = transformConfig.getColumnName();
String transformFunction = transformConfig.getTransformFunction();
if (columnName == null || transformFunction == null) {
throw new IllegalStateException(
"columnName/transformFunction cannot be null in TransformConfig " + transformConfig);
}
if (!transformColumns.add(columnName)) {
throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'");
}
Preconditions.checkState(schema.hasColumn(columnName) || aggregationSourceColumns.contains(columnName),
"The destination column '" + columnName
+ "' of the transform function must be present in the schema or as a source column for "
+ "aggregations");
FunctionEvaluator expressionEvaluator;
if (_disableGroovy && FunctionEvaluatorFactory.isGroovyExpression(transformFunction)) {
throw new IllegalStateException(
"Groovy transform functions are disabled for table config. Found '" + transformFunction
+ "' for column '" + columnName + "'");
}
try {
expressionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(transformFunction);
} catch (Exception e) {
throw new IllegalStateException(
"Invalid transform function '" + transformFunction + "' for column '" + columnName
+ "', exception: " + e.getMessage(), e);
}
List<String> arguments = expressionEvaluator.getArguments();
if (arguments.contains(columnName)) {
throw new IllegalStateException(
"Arguments of a transform function '" + arguments + "' cannot contain the destination column '"
+ columnName + "'");
}
}
}
// Complex configs
ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
if (complexTypeConfig != null) {
Map<String, String> prefixesToRename = complexTypeConfig.getPrefixesToRename();
if (MapUtils.isNotEmpty(prefixesToRename)) {
Set<String> fieldNames = schema.getColumnNames();
for (String prefix : prefixesToRename.keySet()) {
for (String field : fieldNames) {
Preconditions.checkState(!field.startsWith(prefix),
"Fields in the schema may not begin with any prefix specified in the prefixesToRename"
+ " config. Name conflict with field: " + field + " and prefix: " + prefix);
}
}
}
}
SchemaConformingTransformerConfig schemaConformingTransformerConfig =
ingestionConfig.getSchemaConformingTransformerConfig();
if (schemaConformingTransformerConfig != null) {
SchemaConformingTransformer.validateSchema(schema, schemaConformingTransformerConfig);
}
}
}