in odps-sqoop/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java [204:313]
public void run() {
BufferedReader r = null;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// Configure the output with the user's delimiters.
char outputFieldDelim = (char) conf.getInt(
MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
DelimiterSet.NULL_CHAR);
String outputFieldDelimStr = "" + outputFieldDelim;
char outputRecordDelim = (char) conf.getInt(
MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
DelimiterSet.NULL_CHAR);
String outputRecordDelimStr = "" + outputRecordDelim;
char outputEnclose = (char) conf.getInt(
MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
DelimiterSet.NULL_CHAR);
char outputEscape = (char) conf.getInt(
MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
DelimiterSet.NULL_CHAR);
boolean outputEncloseRequired = conf.getBoolean(
MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
DelimiterSet delimiters = new DelimiterSet(
outputFieldDelim,
outputRecordDelim,
outputEnclose,
outputEscape,
outputEncloseRequired);
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
continue; // comments and empty lines are ignored
}
// this line is of the form "INSERT .. VALUES ( actual value text
// );" strip the leading preamble up to the '(' and the trailing
// ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark)
+ recordStartMark.length();
}
// Wrap the input string in a char buffer that ignores the leading
// and trailing text.
CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
inLine.length() - 2);
// Pass this along to the parser
List<String> fields = null;
try {
fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
} catch (RecordParser.ParseError pe) {
LOG.warn("ParseError reading from mysqldump: "
+ pe.toString() + "; record skipped");
continue; // Skip emitting this row.
}
// For all of the output fields, emit them using the delimiters
// the user chooses.
boolean first = true;
StringBuilder sb = new StringBuilder();
int recordLen = 1; // for the delimiter.
for (String field : fields) {
if (!first) {
sb.append(outputFieldDelimStr);
} else {
first = false;
}
String fieldStr = FieldFormatter.escapeAndEnclose(field,
delimiters);
sb.append(fieldStr);
recordLen += fieldStr.length();
}
sb.append(outputRecordDelimStr);
context.write(sb.toString(), null);
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
setError();
} catch (InterruptedException ie) {
LOG.error("InterruptedException reading from mysqldump: "
+ ie.toString());
// flag this error so we get an error status back in the caller.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
}
}