protected void setup()

in odps-sqoop/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java [74:182]


  protected void setup(Context context)
      throws IOException, InterruptedException {
    super.setup(context);
    conf = context.getConfiguration();
    dbConf = new DBConfiguration(conf);
    tableName = dbConf.getOutputTableName();
    tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();

    Connection conn = null;
    try {
      conn = dbConf.getConnection();
      conn.setAutoCommit(false);
      if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
        StringBuffer query = new StringBuffer();
        query.append("DROP TABLE IF EXISTS ");
        query.append(tmpTableName);
        doExecuteUpdate(query.toString());
      }
      StringBuffer query = new StringBuffer();
      query.append("CREATE TABLE ");
      query.append(tmpTableName);
      query.append("(LIKE ");
      query.append(tableName);
      query.append(" INCLUDING CONSTRAINTS)");
      if (conf.get("pgbulkload.staging.tablespace") != null) {
        query.append("TABLESPACE ");
        query.append(conf.get("pgbulkload.staging.tablespace"));
      }
      doExecuteUpdate(query.toString());
      conn.commit();
    } catch (ClassNotFoundException ex) {
      LOG.error("Unable to load JDBC driver class", ex);
      throw new IOException(ex);
    } catch (SQLException ex) {
      LoggingUtils.logAll(LOG, "Unable to execute statement", ex);
      throw new IOException(ex);
    } finally {
      try {
        conn.close();
      } catch (SQLException ex) {
        LoggingUtils.logAll(LOG, "Unable to close connection", ex);
      }
    }

    try {
      ArrayList<String> args = new ArrayList<String>();
      List<String> envp = Executor.getCurEnvpStrings();
      args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
      args.add("--username="
          + conf.get(DBConfiguration.USERNAME_PROPERTY));
      args.add("--dbname="
          + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
      args.add("--host="
          + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
      int port = JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY));
      if (port != -1) {
        args.add("--port=" + port);
      }
      args.add("--input=stdin");
      args.add("--output=" + tmpTableName);
      args.add("-o");
      args.add("TYPE=CSV");
      args.add("-o");
      args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
      args.add("-o");
      args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
      args.add("-o");
      args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
      args.add("-o");
      args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
      args.add("-o");
      args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
      args.add("-o");
      args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
      if (conf.get("pgbulkload.null.string") != null) {
        args.add("-o");
        args.add("NULL=" + conf.get("pgbulkload.null.string"));
      }
      if (conf.get("pgbulkload.filter") != null) {
        args.add("-o");
        args.add("FILTER=" + conf.get("pgbulkload.filter"));
      }
      LOG.debug("Starting pg_bulkload with arguments:");
      for (String arg : args) {
        LOG.debug("  " + arg);
      }
      if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
        String tmpDir = System.getProperty("test.build.data", "/tmp/");
        if (!tmpDir.endsWith(File.separator)) {
          tmpDir = tmpDir + File.separator;
        }
        tmpDir = conf.get("job.local.dir", tmpDir);
        passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
            conf.get(DBConfiguration.PASSWORD_PROPERTY));
        envp.add("PGPASSFILE=" + passwordFilename);
      }
      process = Runtime.getRuntime().exec(args.toArray(new String[0]),
                                          envp.toArray(new String[0]));
      out = process.getOutputStream();
      writer = new BufferedWriter(new OutputStreamWriter(out));
      thread = new ReadThread(process.getErrorStream());
      thread.start();
    } catch (Exception e) {
      LOG.error("Can't start up pg_bulkload process", e);
      cleanup(context);
      doExecuteUpdate("DROP TABLE " + tmpTableName);
      throw new IOException(e);
    }
  }