public static TableSchema getTableSchema()

in holo-client/src/main/java/com/alibaba/hologres/client/impl/util/ConnectionUtil.java [359:488]


	public static TableSchema getTableSchema(Connection conn, TableName tableName) throws SQLException {
		String[] columns = null;
		int[] types = null;
		String[] typeNames = null;
		DatabaseMetaData metaData = conn.getMetaData();
		List<String> primaryKeyList = new ArrayList<>();
		try (ResultSet rs = metaData.getPrimaryKeys(null, tableName.getSchemaName(), tableName.getTableName())) {
			while (rs.next()) {
				primaryKeyList.add(rs.getString(4));
			}
		}
		List<Column> columnList = new ArrayList<>();
		String escape = metaData.getSearchStringEscape();
		// getColumns方法的后三个参数都是LIKE表达式使用的pattern,因此需要对这里传入的schemaName和tableName中的特殊字符进行转义。
		// https://www.postgresql.org/docs/current/functions-matching.html#FUNCTIONS-LIKE PG文档中明确LIKE的特殊字符只有%以及_(还有转义字符本身)。
		try (ResultSet rs = metaData.getColumns(null, escapePattern(tableName.getSchemaName(), escape), escapePattern(tableName.getTableName(), escape), "%")) {
			while (rs.next()) {
				Column column = new Column();
				column.setName(rs.getString(4));
				column.setType(rs.getInt(5));
				column.setTypeName(rs.getString(6));
				column.setPrecision(rs.getInt(7));
				column.setScale(rs.getInt(9));
				column.setAllowNull(rs.getInt(11) == 1);
				column.setComment(rs.getString(12));
				column.setDefaultValue(rs.getObject(13));
				column.setArrayType(column.getTypeName().startsWith("_"));
				column.setPrimaryKey(primaryKeyList.contains(column.getName()));
				columnList.add(column);
			}
		}

		String partitionColumnName = getPartitionColumnName(conn, tableName);

		String sql =
				"select property_key,property_value from hologres.hg_table_properties where table_namespace=? and table_name=? and property_key in "
						+ "('distribution_key','table_id','schema_version','orientation','clustering_key','segment_key','bitmap_columns','dictionary_encoding_columns','time_to_live_in_seconds','binlog.level')";
		String[] distributionKeys = null;
		String tableId = null;
		String schemaVersion = null;
		Map<String, String> properties = new HashMap<>();
		try (PreparedStatement stat = conn.prepareStatement(sql)) {
			stat.setString(1, tableName.getSchemaName());
			stat.setString(2, tableName.getTableName());

			try (ResultSet rs = stat.executeQuery()) {
				while (rs.next()) {
					String propertyName = rs.getString(1);
					String propertyValue = rs.getString(2);
					properties.put(propertyName, propertyValue);
				}
			}
			tableId = properties.get("table_id");
			schemaVersion = properties.get("schema_version");
		}
		if (properties.size() == 0) {
			throw new SQLException("can not found table " + tableName.getFullName());
		}
		if (tableId == null) {
			throw new SQLException("table " + tableName.getFullName() + " has no table_id");
		}
		if (schemaVersion == null) {
			throw new SQLException("table " + tableName.getFullName() + " has no schemaVersion");
		}
		TableSchema.Builder builder = new TableSchema.Builder(tableId, schemaVersion);

		builder.setPartitionColumnName(partitionColumnName);
		builder.setColumns(columnList);
		builder.setTableName(tableName);
		builder.setNotExist(false);
		builder.setSensitive(true);
		for (Map.Entry<String, String> entry : properties.entrySet()) {
			String key = entry.getKey();
			String value = entry.getValue();
			switch (key) {
				case "distribution_key":
					if ("".equals(value)) {
						throw new SQLException("empty distribution_key is not supported.");
					}
					builder.setDistributionKeys(value.split(","));
					break;
				case "orientation":
					builder.setOrientation(value);
					break;
				case "clustering_key":
					builder.setClusteringKey(value.split(","));
					break;
				case "segment_key":
					builder.setSegmentKey(value.split(","));
					break;
				case "bitmap_columns":
					builder.setBitmapIndexKey(value.split(","));
					break;
				case "dictionary_encoding_columns":
					builder.setDictionaryEncoding(value.split(","));
					break;
				case "time_to_live_in_seconds":
					builder.setLifecycle(Long.parseLong(value));
					break;
				case "binlog.level":
					builder.setBinlogLevel(value);
					break;
				default:
			}
		}
		if (partitionColumnName != null) {
			sql = "SELECT time_unit,time_zone,num_precreate from hologres.hg_partitioning_config where enable = true and nsp_name=? and tbl_name=?";
			try (PreparedStatement stat = conn.prepareStatement(sql)) {
				stat.setString(1, tableName.getSchemaName());
				stat.setString(2, tableName.getTableName());
				try (ResultSet rs = stat.executeQuery()) {
					while (rs.next()) {
						builder.setAutoPartitioningEnable(true);
						builder.setAutoPartitioningTimeUnit(rs.getString("time_unit"));
						builder.setAutoPartitioningTimeZone(rs.getString("time_zone"));
						builder.setAutoPartitioningPreCreateNum(rs.getInt("num_precreate"));
					}
				}
			} catch (SQLException e) {
				if (e.getMessage().contains("relation \"hologres.hg_partitioning_config\" does not exist")) {
                    builder.setAutoPartitioningEnable(false);
                } else {
                    throw e;
                }
            }
		}
		TableSchema tableSchema = builder.build();
		tableSchema.calculateProperties();
		return tableSchema;
	}