in odps-sqoop/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java [74:182]
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
conf = context.getConfiguration();
dbConf = new DBConfiguration(conf);
tableName = dbConf.getOutputTableName();
tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
Connection conn = null;
try {
conn = dbConf.getConnection();
conn.setAutoCommit(false);
if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
StringBuffer query = new StringBuffer();
query.append("DROP TABLE IF EXISTS ");
query.append(tmpTableName);
doExecuteUpdate(query.toString());
}
StringBuffer query = new StringBuffer();
query.append("CREATE TABLE ");
query.append(tmpTableName);
query.append("(LIKE ");
query.append(tableName);
query.append(" INCLUDING CONSTRAINTS)");
if (conf.get("pgbulkload.staging.tablespace") != null) {
query.append("TABLESPACE ");
query.append(conf.get("pgbulkload.staging.tablespace"));
}
doExecuteUpdate(query.toString());
conn.commit();
} catch (ClassNotFoundException ex) {
LOG.error("Unable to load JDBC driver class", ex);
throw new IOException(ex);
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute statement", ex);
throw new IOException(ex);
} finally {
try {
conn.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to close connection", ex);
}
}
try {
ArrayList<String> args = new ArrayList<String>();
List<String> envp = Executor.getCurEnvpStrings();
args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
args.add("--username="
+ conf.get(DBConfiguration.USERNAME_PROPERTY));
args.add("--dbname="
+ JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
args.add("--host="
+ JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
int port = JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY));
if (port != -1) {
args.add("--port=" + port);
}
args.add("--input=stdin");
args.add("--output=" + tmpTableName);
args.add("-o");
args.add("TYPE=CSV");
args.add("-o");
args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
args.add("-o");
args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
args.add("-o");
args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
args.add("-o");
args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
args.add("-o");
args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
args.add("-o");
args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
if (conf.get("pgbulkload.null.string") != null) {
args.add("-o");
args.add("NULL=" + conf.get("pgbulkload.null.string"));
}
if (conf.get("pgbulkload.filter") != null) {
args.add("-o");
args.add("FILTER=" + conf.get("pgbulkload.filter"));
}
LOG.debug("Starting pg_bulkload with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
String tmpDir = System.getProperty("test.build.data", "/tmp/");
if (!tmpDir.endsWith(File.separator)) {
tmpDir = tmpDir + File.separator;
}
tmpDir = conf.get("job.local.dir", tmpDir);
passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
conf.get(DBConfiguration.PASSWORD_PROPERTY));
envp.add("PGPASSFILE=" + passwordFilename);
}
process = Runtime.getRuntime().exec(args.toArray(new String[0]),
envp.toArray(new String[0]));
out = process.getOutputStream();
writer = new BufferedWriter(new OutputStreamWriter(out));
thread = new ReadThread(process.getErrorStream());
thread.start();
} catch (Exception e) {
LOG.error("Can't start up pg_bulkload process", e);
cleanup(context);
doExecuteUpdate("DROP TABLE " + tmpTableName);
throw new IOException(e);
}
}