in odps-sqoop/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java [111:236]
private void initMySQLImportProcess() throws IOException {
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
this.fifoFile = new File(taskAttemptDir,
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
String filename = fifoFile.toString();
// Create the FIFO itself.
try {
new NamedFifo(this.fifoFile).create();
} catch (IOException ioe) {
// Command failed.
LOG.error("Could not mknod " + filename);
this.fifoFile = null;
throw new IOException(
"Could not create FIFO to interface with mysqlimport", ioe);
}
// Now open the connection to mysqlimport.
ArrayList<String> args = new ArrayList<String>();
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new IOException("Could not determine database name");
}
args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);
if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
}
String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}
args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
args.add("--compress");
args.add("--local");
args.add("--silent");
// Specify the subset of columns we're importing.
DBConfiguration dbConf = new DBConfiguration(conf);
String [] cols = dbConf.getInputFieldNames();
if (null != cols) {
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String col : cols) {
if (!first) {
sb.append(",");
}
sb.append(col);
first = false;
}
args.add("--columns=" + sb.toString());
}
// Specify the delimiters to use.
int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
(int) ',');
int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
(int) '\n');
int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
boolean encloseRequired = conf.getBoolean(
MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
args.add("--fields-terminated-by=0x"
+ Integer.toString(outputFieldDelim, 16));
args.add("--lines-terminated-by=0x"
+ Integer.toString(outputRecordDelim, 16));
if (0 != enclosedBy) {
if (encloseRequired) {
args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
} else {
args.add("--fields-optionally-enclosed-by=0x"
+ Integer.toString(enclosedBy, 16));
}
}
if (0 != escapedBy) {
args.add("--fields-escaped-by=0x" + Integer.toString(escapedBy, 16));
}
// These two arguments are positional and must be last.
args.add(databaseName);
args.add(filename);
// Begin the export in an external process.
LOG.debug("Starting mysqlimport with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
// Log everything it writes to stderr.
// Ignore anything on stdout.
this.outSink = new NullAsyncSink();
this.outSink.processStream(mysqlImportProcess.getInputStream());
this.errSink = new LoggingAsyncSink(LOG);
this.errSink.processStream(mysqlImportProcess.getErrorStream());
// Open the named FIFO after starting mysqlimport.
this.importStream = new BufferedOutputStream(
new FileOutputStream(fifoFile));
// At this point, mysqlimport is running and hooked up to our FIFO.
// The mapper just needs to populate it with data.
this.bytesWritten = 0;
}