protected void validate()

in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java [115:224]


  protected void validate(ReadableConfig options, TableSchema schema, Type type) {
    String[] fieldNames = schema.getFieldNames();
    DataType[] fieldDataTypes = schema.getFieldDataTypes();

    if (!"Time_".equals(fieldNames[0]) || !fieldDataTypes[0].equals(DataTypes.BIGINT())) {
      throw new IllegalSchemaException(
          "The first field's name must be `Time_`, and its data type must be BIGINT.");
    }
    for (String fieldName : fieldNames) {
      if (!"Time_".equals(fieldName) && !fieldName.startsWith("root.")) {
        throw new IllegalIoTDBPathException(
            String.format("The field name `%s` doesn't start with 'root.'.", fieldName));
      }
      try {
        String[] nodes = PathUtils.splitPathToDetachedNodes(fieldName);
        for (String node : nodes) {
          if (Utils.isNumeric(node)) {
            throw new IllegalIoTDBPathException(
                String.format(
                    "The node `%s` in the field name `%s` is a pure number, which is not allowed in IoTDB.",
                    node, fieldName));
          }
        }
      } catch (IllegalPathException e) {
        throw new IllegalIoTDBPathException(e.getMessage());
      }
    }

    for (DataType fieldDataType : fieldDataTypes) {
      if (!supportedDataTypes.contains(fieldDataType)) {
        throw new UnsupportedDataTypeException(
            "IoTDB doesn't support the data type: " + fieldDataType);
      }
    }

    String[] nodeUrls = options.get(Options.NODE_URLS).split(",");
    for (String nodeUrl : nodeUrls) {
      String[] split = nodeUrl.split(":");
      if (split.length != 2) {
        throw new IllegalUrlPathException("Every node's URL must be in the format of `host:port`.");
      }
      if (!Utils.isNumeric(split[1])) {
        throw new IllegalUrlPathException(
            String.format("The port in url %s must be a number.", nodeUrl));
      } else {
        int port = Integer.parseInt(split[1]);
        if (port > 65535) {
          throw new IllegalUrlPathException(
              String.format("The port in url %s must be smaller than 65536", nodeUrl));
        } else if (port < 1) {
          throw new IllegalUrlPathException(
              String.format("The port in url %s must be greater than 0.", nodeUrl));
        }
      }
    }

    Long lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
    Long upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
    if (lowerBound > 0L && upperBound > 0L && upperBound < lowerBound) {
      throw new IllegalOptionException(
          "The value of option `scan.bounded.lower-bound` could not be greater than the value of option `scan.bounded.upper-bound`.");
    }

    if (type == Type.SOURCE) {
      if (options.get(Options.MODE) == Options.Mode.CDC) {
        if (options.get(Options.CDC_TASK_NAME) == null) {
          throw new IllegalOptionException(
              "The option `cdc.task.name` is required when option `mode` equals `CDC`");
        }
        if (options.get(Options.PATTERN) == null) {
          throw new IllegalOptionException(
              "The option `cdc.pattern` is required when option `mode` equals `CDC`");
        }
      } else if (options.get(Options.MODE) == Options.Mode.BOUNDED) {
        if ((options.get(Options.SQL) == null)) {
          throw new IllegalOptionException(
              "The option `sql` is required when option `mode` equals `BOUNDED`");
        }
        String sqlLower = options.get(Options.SQL).toLowerCase();
        if (!sqlLower.contains("select")
            || sqlLower.contains("count(")
            || sqlLower.contains("sum(")
            || sqlLower.contains("avg(")
            || sqlLower.contains("extreme(")
            || sqlLower.contains("max_value(")
            || sqlLower.contains("min_value(")
            || sqlLower.contains("first_value(")
            || sqlLower.contains("last_value(")
            || sqlLower.contains("max_time(")
            || sqlLower.contains("min_time(")
            || sqlLower.contains("max_by(")
            || sqlLower.contains("min_by(")
            || sqlLower.contains("stddev(")
            || sqlLower.contains("stddev_pop(")
            || sqlLower.contains("stddev_samp(")
            || sqlLower.contains("variance(")
            || sqlLower.contains("var_pop(")
            || sqlLower.contains("var_samp(")
            || sqlLower.contains("group")
            || sqlLower.contains("where")
            || sqlLower.contains("create")
            || sqlLower.contains("count")
            || sqlLower.contains("delete")
            || sqlLower.contains("show")) {
          throw new IllegalOptionException(
              "The option `sql` only supports basic query statements.");
        }
      }
    }
  }