in samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java [180:258]
private void validateOutputRecords(SqlSchema outputSqlSchema, RelRecordType outputRecord,
RelRecordType projectRecord, RelSchemaProvider outputRelSchemaProvider)
throws SamzaSqlValidatorException {
Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
Map<String, SqlFieldSchema> outputFieldSchemaMap = outputSqlSchema.getFields().stream().collect(
Collectors.toMap(SqlSchema.SqlField::getFieldName, SqlSchema.SqlField::getFieldSchema));
Map<String, RelDataType> projectedRecordMap = projectRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
// Ensure that all fields from sql statement exist in the output schema and are of the same type.
for (Map.Entry<String, RelDataType> entry : projectedRecordMap.entrySet()) {
String projectedFieldName = entry.getKey();
RelDataType outputFieldType = outputRecordMap.get(projectedFieldName);
SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
if (outputFieldType == null) {
// If the field names are specified more than once in the select query, calcite appends 'n' as suffix to the
// dup fields based on the order they are specified, where 'n' starts from 0 for the first dup field.
// Take the following example: SELECT id as str, secondaryId as str, tertiaryId as str FROM store.myTable
// Calcite renames the projected fieldNames in select query as str, str0, str1 respectively.
// Samza Sql allows a field name to be specified up to 2 times. Do the validation accordingly.
// This type of pattern is typically followed when users want to just modify one field in the input table while
// keeping rest of the fields the same. Eg: SELECT myUdf(id) as id, * from store.myTable
if (projectedFieldName.endsWith("0")) {
projectedFieldName = StringUtils.chop(projectedFieldName);
outputFieldType = outputRecordMap.get(projectedFieldName);
outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
}
if (outputFieldType == null) {
String errMsg = String.format("Field '%s' in select query does not match any field in output schema.", entry.getKey());
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
}
}
Validate.notNull(outputFieldType);
Validate.notNull(outputSqlFieldSchema);
RelDataType calciteSqlType = getCalciteSqlFieldType(entry.getValue());
if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, calciteSqlType, outputRelSchemaProvider)) {
String errMsg = String.format("Field '%s' with type '%s' (calciteSqlType:'%s') in select query does not match "
+ "the field type '%s' in output schema.", entry.getKey(), entry.getValue(), calciteSqlType,
outputSqlFieldSchema.getFieldType());
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
}
}
// Ensure that all non-optional fields in output schema are set in the sql query and are of the
// same type.
for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
RelDataType projectedFieldType = projectedRecordMap.get(entry.getKey());
SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());
if (projectedFieldType == null) {
// If an output schema field is not found in the sql query, ignore it if the field is optional.
// Otherwise, throw an error.
if (outputSqlFieldSchema.isOptional() || isOptional(outputRelSchemaProvider, entry.getKey(), projectRecord)) {
continue;
}
String errMsg = String.format("Non-optional field '%s' in output schema is missing in projected fields of "
+ "select query.", entry.getKey());
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
} else {
RelDataType calciteSqlType = getCalciteSqlFieldType(projectedFieldType);
if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, calciteSqlType, outputRelSchemaProvider)) {
String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field"
+ " type '%s' (calciteType:'%s') in projected fields.", entry.getKey(),
outputSqlFieldSchema.getFieldType(), projectedFieldType, calciteSqlType);
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
}
}
}
}