public TableInfo getTableInfo()

in adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java [86:201]


    public TableInfo getTableInfo(String table) throws AdsException {

        if (table == null) {
            throw new AdsException(AdsException.ADS_TABLEMETA_TABLE_NULL, "Table is null.", null);
        }

        if (adsURL == null) {
            throw new AdsException(AdsException.ADS_CONN_URL_NOT_SET, "ADS JDBC connection URL was not set.", null);
        }

        if (userName == null) {
            throw new AdsException(AdsException.ADS_CONN_USERNAME_NOT_SET,
                    "ADS JDBC connection user name was not set.", null);
        }

        if (password == null) {
            throw new AdsException(AdsException.ADS_CONN_PASSWORD_NOT_SET, "ADS JDBC connection password was not set.",
                    null);
        }

        if (schema == null) {
            throw new AdsException(AdsException.ADS_CONN_SCHEMA_NOT_SET, "ADS JDBC connection schema was not set.",
                    null);
        }

        Connection connection = null;
        Statement statement = null;
        ResultSet rs = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            String url = AdsUtil.prepareJdbcUrl(this.adsURL, this.schema, this.socketTimeout, this.suffix);

            Properties connectionProps = new Properties();
            connectionProps.put("user", userName);
            connectionProps.put("password", password);
            connection = DriverManager.getConnection(url, connectionProps);
            statement = connection.createStatement();
            // ads 表名、schema名不区分大小写, 提高用户易用性, 注意列顺序性
            String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where table_schema = `'%s'` and table_name = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase());
            LOG.info(String.format("检查列信息sql语句:%s", columnMetaSql));
            rs = statement.executeQuery(columnMetaSql);

            TableInfo tableInfo = new TableInfo();
            List<ColumnInfo> columnInfoList = new ArrayList<ColumnInfo>();
            while (DBUtil.asyncResultSetNext(rs)) {
                ColumnInfo columnInfo = new ColumnInfo();
                columnInfo.setOrdinal(rs.getInt(1));
                columnInfo.setName(rs.getString(2));
                //columnInfo.setDataType(ColumnDataType.getDataType(rs.getInt(3))); //for ads version < 0.7
                //columnInfo.setDataType(ColumnDataType.getTypeByName(rs.getString(3).toUpperCase())); //for ads version 0.8
                columnInfo.setDataType(ColumnDataType.getTypeByName(rs.getString(4).toUpperCase())); //for ads version 0.8 & 0.7
                columnInfo.setComment(rs.getString(5));
                columnInfoList.add(columnInfo);
            }
            if (columnInfoList.isEmpty()) {
                throw DataXException.asDataXException(AdsWriterErrorCode.NO_ADS_TABLE, table + "不存在或者查询不到列信息. ");
            }
            tableInfo.setColumns(columnInfoList);
            tableInfo.setTableSchema(schema);
            tableInfo.setTableName(table);
            DBUtil.closeDBResources(rs, statement, null);
            
            String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where table_schema = `'%s'` and table_name = `'%s'`", schema.toLowerCase(), table.toLowerCase());
            LOG.info(String.format("检查表信息sql语句:%s", tableMetaSql));
            statement = connection.createStatement();
            rs = statement.executeQuery(tableMetaSql);
            while (DBUtil.asyncResultSetNext(rs)) {
                tableInfo.setUpdateType(rs.getString(1));
                tableInfo.setPartitionType(rs.getString(2));
                tableInfo.setPartitionColumn(rs.getString(3));
                tableInfo.setPartitionCount(rs.getInt(4));
                //primary_key_columns  ads主键是逗号分隔的,可以有多个
                String primaryKeyColumns = rs.getString(5);
                if (StringUtils.isNotBlank(primaryKeyColumns)) {
                    tableInfo.setPrimaryKeyColumns(Arrays.asList(StringUtils.split(primaryKeyColumns, ",")));
                } else {
                    tableInfo.setPrimaryKeyColumns(null);
                }
                break;
            }
            DBUtil.closeDBResources(rs, statement, null);
            return tableInfo;

        } catch (ClassNotFoundException e) {
            throw new AdsException(AdsException.OTHER, e.getMessage(), e);
        } catch (SQLException e) {
            throw new AdsException(AdsException.OTHER, e.getMessage(), e);
        } catch ( DataXException e) {
            throw e;
        } catch (Exception e) {
            throw new AdsException(AdsException.OTHER, e.getMessage(), e);
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    // Ignore exception
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    // Ignore exception
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    // Ignore exception
                }
            }
        }

    }