private void validateOutputRecords()

in samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java [180:258]


  private void validateOutputRecords(SqlSchema outputSqlSchema, RelRecordType outputRecord,
      RelRecordType projectRecord, RelSchemaProvider outputRelSchemaProvider)
      throws SamzaSqlValidatorException {
    Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect(
        Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));
    Map<String, SqlFieldSchema> outputFieldSchemaMap = outputSqlSchema.getFields().stream().collect(
        Collectors.toMap(SqlSchema.SqlField::getFieldName, SqlSchema.SqlField::getFieldSchema));
    Map<String, RelDataType> projectedRecordMap = projectRecord.getFieldList().stream().collect(
        Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType));

    // Ensure that all fields from sql statement exist in the output schema and are of the same type.
    for (Map.Entry<String, RelDataType> entry : projectedRecordMap.entrySet()) {
      String projectedFieldName = entry.getKey();
      RelDataType outputFieldType = outputRecordMap.get(projectedFieldName);
      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);

      if (outputFieldType == null) {
        // If the field names are specified more than once in the select query, calcite appends 'n' as suffix to the
        // dup fields based on the order they are specified, where 'n' starts from 0 for the first dup field.
        // Take the following example: SELECT id as str, secondaryId as str, tertiaryId as str FROM store.myTable
        //   Calcite renames the projected fieldNames in select query as str, str0, str1 respectively.
        // Samza Sql allows a field name to be specified up to 2 times. Do the validation accordingly.

        // This type of pattern is typically followed when users want to just modify one field in the input table while
        // keeping rest of the fields the same. Eg: SELECT myUdf(id) as id, * from store.myTable
        if (projectedFieldName.endsWith("0")) {
          projectedFieldName = StringUtils.chop(projectedFieldName);
          outputFieldType = outputRecordMap.get(projectedFieldName);
          outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName);
        }

        if (outputFieldType == null) {
          String errMsg = String.format("Field '%s' in select query does not match any field in output schema.", entry.getKey());
          LOG.error(errMsg);
          throw new SamzaSqlValidatorException(errMsg);
        }
      }

      Validate.notNull(outputFieldType);
      Validate.notNull(outputSqlFieldSchema);

      RelDataType calciteSqlType = getCalciteSqlFieldType(entry.getValue());
      if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, calciteSqlType, outputRelSchemaProvider)) {
        String errMsg = String.format("Field '%s' with type '%s' (calciteSqlType:'%s') in select query does not match "
                + "the field type '%s' in output schema.", entry.getKey(), entry.getValue(), calciteSqlType,
            outputSqlFieldSchema.getFieldType());
        LOG.error(errMsg);
        throw new SamzaSqlValidatorException(errMsg);
      }
    }

    // Ensure that all non-optional fields in output schema are set in the sql query and are of the
    // same type.
    for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
      RelDataType projectedFieldType = projectedRecordMap.get(entry.getKey());
      SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey());

      if (projectedFieldType == null) {
        // If an output schema field is not found in the sql query, ignore it if the field is optional.
        // Otherwise, throw an error.
        if (outputSqlFieldSchema.isOptional() || isOptional(outputRelSchemaProvider, entry.getKey(), projectRecord)) {
          continue;
        }
        String errMsg = String.format("Non-optional field '%s' in output schema is missing in projected fields of "
            + "select query.", entry.getKey());
        LOG.error(errMsg);
        throw new SamzaSqlValidatorException(errMsg);
      } else {
        RelDataType calciteSqlType = getCalciteSqlFieldType(projectedFieldType);
        if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, calciteSqlType, outputRelSchemaProvider)) {
          String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field"
                  + " type '%s' (calciteType:'%s') in projected fields.", entry.getKey(),
              outputSqlFieldSchema.getFieldType(), projectedFieldType, calciteSqlType);
          LOG.error(errMsg);
          throw new SamzaSqlValidatorException(errMsg);
        }
      }
    }
  }