public static void main()

in example/src/main/java/DataDumpBenchmark.java [39:210]


  public static void main(String[] args) throws SQLException {

    final int batchSize;
    final int splitNum;
    final Properties odpsConfig = new Properties();
    try {
      splitNum = Integer.parseInt(args[0]);
      batchSize = Integer.parseInt(args[1]);
      System.out.println("split size: " + batchSize);
      System.out.println("batch size: " + splitNum);

      Class.forName(driverName);

      InputStream is =
          Thread.currentThread().getContextClassLoader().getResourceAsStream("conf.properties");
      odpsConfig.load(is);

    } catch (Exception e) {
      System.out.println(e);
      return;
    }

    final Connection connSource = DriverManager.getConnection(
        "jdbc:odps:" + odpsConfig.getProperty("endpoint_1") + "?project=meta_dev&loglevel=debug&",
        odpsConfig.getProperty("access_id_1"),
        odpsConfig.getProperty("access_key_1"));

    final Connection connTarget = DriverManager.getConnection(
        "jdbc:odps:" + odpsConfig.getProperty("endpoint_2") + "?project=odps_test_sqltask_finance&loglevel=debug",
        odpsConfig.getProperty("access_id_2"),
        odpsConfig.getProperty("access_key_2"));

    String sourceSchema = "meta";
    String sourceTable = "m_instance";
    String targetTable = "m_instance_copy";
    String sourceWhere = "ds = '20151120'";
    String splitColumn = "start_time";

    ResultSet cols = connSource.getMetaData().getColumns(null, sourceSchema, sourceTable, null);
    List<String> nameTypePairs = new ArrayList<String>();
    List<String> questionMarks = new ArrayList<String>();
    while (cols.next()) {
      nameTypePairs.add(cols.getString("COLUMN_NAME") + " " + cols.getString("TYPE_NAME"));
      questionMarks.add("?");
    }

    final int colNums = nameTypePairs.size();
    if (sourceSchema != null) {
      sourceTable = sourceSchema + "." + sourceTable;
    }

    String createTableSql = "create table " + targetTable + "(" +
                            joinStrings(nameTypePairs.toArray(new String[nameTypePairs.size()]),
                                        ", ") + ")";
    final String insertValuesSql = "insert into " + targetTable + " values (" +
                             joinStrings(questionMarks.toArray(new String[questionMarks.size()]),
                                         ", ") + ")";

    String rangeSql = "select max(" + splitColumn + "), min(" + splitColumn + ") from " + sourceTable;
    if (sourceWhere != null) {
      rangeSql += " where " + sourceWhere;
    }

    Statement ddl = connTarget.createStatement();
    ddl.executeUpdate("drop table if exists " + targetTable);
    ddl.executeUpdate(createTableSql);
    ddl.close();

    Statement query = connSource.createStatement();
    ResultSet range = query.executeQuery(rangeSql);
    long max, min;
    range.next();
    max = range.getLong(1);
    min = range.getLong(2);
    range.close();
    query.close();

    long[] markers = new long[splitNum + 1];
    markers[0] = min;
    for (int i = 1; i < splitNum; i++) {
      markers[i] = markers[i - 1] + (max - min + 1) / splitNum;
    }
    markers[splitNum] = max;

    String[] cutters = new String[splitNum];
    for (int i = 0; i < splitNum - 1; i++) {
      cutters[i] = markers[i] + " <= " + splitColumn + " and " + splitColumn + " < " + markers[i + 1];
    }
    cutters[splitNum - 1] =
        markers[splitNum - 1] + " <= " + splitColumn + " and " + splitColumn + " <= " + markers[splitNum];

    connSource.close();
    connTarget.close();


    long start = System.currentTimeMillis();


    ArrayList<Callable<Long>> callList = new ArrayList<Callable<Long>>();
    for (int i = 0; i < splitNum; i++) {

      String selectSql = "select * from " + sourceTable + " where ";
      selectSql += cutters[i];
      if (sourceWhere != null) {
        selectSql += " and " + sourceWhere;
      }

      final String selectSql2 = selectSql;
      Callable<Long> call = new Callable<Long>() {
        public Long call() throws Exception {
          final Connection connSource = DriverManager.getConnection(
              "jdbc:odps:" + odpsConfig.getProperty("endpoint_1")
              + "?project=meta_dev&loglevel=debug&",
              odpsConfig.getProperty("access_id_1"),
              odpsConfig.getProperty("access_key_1"));

          final Connection connTarget = DriverManager.getConnection(
              "jdbc:odps:" + odpsConfig.getProperty("endpoint_2")
              + "?project=odps_test_sqltask_finance&loglevel=debug",
              odpsConfig.getProperty("access_id_2"),
              odpsConfig.getProperty("access_key_2"));

          Statement query = connSource.createStatement();
          PreparedStatement insert = connTarget.prepareStatement(insertValuesSql);
          ResultSet rs = query.executeQuery(selectSql2);
          long start = System.currentTimeMillis();
          long now = start;

          int count = 0;
          while (rs.next()) {
            for (int i = 0; i < colNums; i++) {
              try {
                insert.setObject((i + 1), rs.getObject(i + 1));
              } catch (SQLException e) {
                e.printStackTrace();
                System.exit(1);
              }
            }

            insert.addBatch();
            if (++count % batchSize == 0) {
              insert.executeBatch();
              long end = System.currentTimeMillis();
              System.out.printf("batch time: %.2f seconds\n", (float) (end - now) / 1000);
              now = end;
            }
          }

          insert.executeBatch(); // insert remaining records
          rs.close();
          query.close();
          insert.close();
          connSource.close();
          connTarget.close();
          return 0L;
        }
      };
      callList.add(call);
    }

    ExecutorService executors = Executors.newFixedThreadPool(2);
    try {
      executors.invokeAll(callList);
    } catch (InterruptedException e) {
      e.printStackTrace();
      return;
    }
    executors.shutdown();

    System.out.printf("total: %.2f minutes\n",
                      (float) (System.currentTimeMillis() - start) / 1000 / 60);
  }