public String loadData()

in adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java [213:321]


    public String loadData(String table, String partition, String sourcePath, boolean overwrite) throws AdsException {

        if (table == null) {
            throw new AdsException(AdsException.ADS_LOADDATA_TABLE_NULL, "ADS LOAD DATA table is null.", null);
        }

        if (sourcePath == null) {
            throw new AdsException(AdsException.ADS_LOADDATA_SOURCEPATH_NULL, "ADS LOAD DATA source path is null.",
                    null);
        }

        if (adsURL == null) {
            throw new AdsException(AdsException.ADS_CONN_URL_NOT_SET, "ADS JDBC connection URL was not set.", null);
        }

        if (userName == null) {
            throw new AdsException(AdsException.ADS_CONN_USERNAME_NOT_SET,
                    "ADS JDBC connection user name was not set.", null);
        }

        if (password == null) {
            throw new AdsException(AdsException.ADS_CONN_PASSWORD_NOT_SET, "ADS JDBC connection password was not set.",
                    null);
        }

        if (schema == null) {
            throw new AdsException(AdsException.ADS_CONN_SCHEMA_NOT_SET, "ADS JDBC connection schema was not set.",
                    null);
        }

        StringBuilder sb = new StringBuilder();
        sb.append("LOAD DATA FROM ");
        if (sourcePath.startsWith("'") && sourcePath.endsWith("'")) {
            sb.append(sourcePath);
        } else {
            sb.append("'" + sourcePath + "'");
        }
        if (overwrite) {
            sb.append(" OVERWRITE");
        }
        sb.append(" INTO TABLE ");
        sb.append(schema + "." + table);
        if (partition != null && !partition.trim().equals("")) {
            String partitionTrim = partition.trim();
            if(partitionTrim.startsWith("(") && partitionTrim.endsWith(")")) {
                sb.append(" PARTITION " + partition);
            } else {
                sb.append(" PARTITION " + "(" + partition + ")");
            }
        }

        Connection connection = null;
        Statement statement = null;
        ResultSet rs = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            String url = AdsUtil.prepareJdbcUrl(this.adsURL, this.schema, this.socketTimeout, this.suffix);
            Properties connectionProps = new Properties();
            connectionProps.put("user", userName);
            connectionProps.put("password", password);
            connection = DriverManager.getConnection(url, connectionProps);
            statement = connection.createStatement();
            LOG.info("正在从ODPS数据库导数据到ADS中: "+sb.toString());
            LOG.info("由于ADS的限制,ADS导数据最少需要20分钟,请耐心等待");
            rs = statement.executeQuery(sb.toString());

            String jobId = null;
            while (DBUtil.asyncResultSetNext(rs)) {
                jobId = rs.getString(1);
            }

            if (jobId == null) {
                throw new AdsException(AdsException.ADS_LOADDATA_JOBID_NOT_AVAIL,
                        "Job id is not available for the submitted LOAD DATA." + jobId, null);
            }

            return jobId;

        } catch (ClassNotFoundException e) {
            throw new AdsException(AdsException.ADS_LOADDATA_FAILED, e.getMessage(), e);
        } catch (SQLException e) {
            throw new AdsException(AdsException.ADS_LOADDATA_FAILED, e.getMessage(), e);
        } catch (Exception e) {
            throw new AdsException(AdsException.ADS_LOADDATA_FAILED, e.getMessage(), e);
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    // Ignore exception
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    // Ignore exception
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    // Ignore exception
                }
            }
        }

    }