in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java [115:224]
protected void validate(ReadableConfig options, TableSchema schema, Type type) {
String[] fieldNames = schema.getFieldNames();
DataType[] fieldDataTypes = schema.getFieldDataTypes();
if (!"Time_".equals(fieldNames[0]) || !fieldDataTypes[0].equals(DataTypes.BIGINT())) {
throw new IllegalSchemaException(
"The first field's name must be `Time_`, and its data type must be BIGINT.");
}
for (String fieldName : fieldNames) {
if (!"Time_".equals(fieldName) && !fieldName.startsWith("root.")) {
throw new IllegalIoTDBPathException(
String.format("The field name `%s` doesn't start with 'root.'.", fieldName));
}
try {
String[] nodes = PathUtils.splitPathToDetachedNodes(fieldName);
for (String node : nodes) {
if (Utils.isNumeric(node)) {
throw new IllegalIoTDBPathException(
String.format(
"The node `%s` in the field name `%s` is a pure number, which is not allowed in IoTDB.",
node, fieldName));
}
}
} catch (IllegalPathException e) {
throw new IllegalIoTDBPathException(e.getMessage());
}
}
for (DataType fieldDataType : fieldDataTypes) {
if (!supportedDataTypes.contains(fieldDataType)) {
throw new UnsupportedDataTypeException(
"IoTDB doesn't support the data type: " + fieldDataType);
}
}
String[] nodeUrls = options.get(Options.NODE_URLS).split(",");
for (String nodeUrl : nodeUrls) {
String[] split = nodeUrl.split(":");
if (split.length != 2) {
throw new IllegalUrlPathException("Every node's URL must be in the format of `host:port`.");
}
if (!Utils.isNumeric(split[1])) {
throw new IllegalUrlPathException(
String.format("The port in url %s must be a number.", nodeUrl));
} else {
int port = Integer.parseInt(split[1]);
if (port > 65535) {
throw new IllegalUrlPathException(
String.format("The port in url %s must be smaller than 65536", nodeUrl));
} else if (port < 1) {
throw new IllegalUrlPathException(
String.format("The port in url %s must be greater than 0.", nodeUrl));
}
}
}
Long lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
Long upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
if (lowerBound > 0L && upperBound > 0L && upperBound < lowerBound) {
throw new IllegalOptionException(
"The value of option `scan.bounded.lower-bound` could not be greater than the value of option `scan.bounded.upper-bound`.");
}
if (type == Type.SOURCE) {
if (options.get(Options.MODE) == Options.Mode.CDC) {
if (options.get(Options.CDC_TASK_NAME) == null) {
throw new IllegalOptionException(
"The option `cdc.task.name` is required when option `mode` equals `CDC`");
}
if (options.get(Options.PATTERN) == null) {
throw new IllegalOptionException(
"The option `cdc.pattern` is required when option `mode` equals `CDC`");
}
} else if (options.get(Options.MODE) == Options.Mode.BOUNDED) {
if ((options.get(Options.SQL) == null)) {
throw new IllegalOptionException(
"The option `sql` is required when option `mode` equals `BOUNDED`");
}
String sqlLower = options.get(Options.SQL).toLowerCase();
if (!sqlLower.contains("select")
|| sqlLower.contains("count(")
|| sqlLower.contains("sum(")
|| sqlLower.contains("avg(")
|| sqlLower.contains("extreme(")
|| sqlLower.contains("max_value(")
|| sqlLower.contains("min_value(")
|| sqlLower.contains("first_value(")
|| sqlLower.contains("last_value(")
|| sqlLower.contains("max_time(")
|| sqlLower.contains("min_time(")
|| sqlLower.contains("max_by(")
|| sqlLower.contains("min_by(")
|| sqlLower.contains("stddev(")
|| sqlLower.contains("stddev_pop(")
|| sqlLower.contains("stddev_samp(")
|| sqlLower.contains("variance(")
|| sqlLower.contains("var_pop(")
|| sqlLower.contains("var_samp(")
|| sqlLower.contains("group")
|| sqlLower.contains("where")
|| sqlLower.contains("create")
|| sqlLower.contains("count")
|| sqlLower.contains("delete")
|| sqlLower.contains("show")) {
throw new IllegalOptionException(
"The option `sql` only supports basic query statements.");
}
}
}
}