private void initMySQLImportProcess()

in odps-sqoop/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java [111:236]


  private void initMySQLImportProcess() throws IOException {
    File taskAttemptDir = TaskId.getLocalWorkPath(conf);

    this.fifoFile = new File(taskAttemptDir,
        conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
    String filename = fifoFile.toString();

    // Create the FIFO itself.
    try {
      new NamedFifo(this.fifoFile).create();
    } catch (IOException ioe) {
      // Command failed.
      LOG.error("Could not mknod " + filename);
      this.fifoFile = null;
      throw new IOException(
          "Could not create FIFO to interface with mysqlimport", ioe);
    }

    // Now open the connection to mysqlimport.
    ArrayList<String> args = new ArrayList<String>();

    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
    String databaseName = JdbcUrl.getDatabaseName(connectString);
    String hostname = JdbcUrl.getHostName(connectString);
    int port = JdbcUrl.getPort(connectString);

    if (null == databaseName) {
      throw new IOException("Could not determine database name");
    }

    args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
    String password = DBConfiguration.getPassword((JobConf) conf);

    if (null != password && password.length() > 0) {
      passwordFile = new File(MySQLUtils.writePasswordFile(conf));
      args.add("--defaults-file=" + passwordFile);
    }

    String username = conf.get(MySQLUtils.USERNAME_KEY);
    if (null != username) {
      args.add("--user=" + username);
    }

    args.add("--host=" + hostname);
    if (-1 != port) {
      args.add("--port=" + Integer.toString(port));
    }

    args.add("--compress");
    args.add("--local");
    args.add("--silent");

    // Specify the subset of columns we're importing.
    DBConfiguration dbConf = new DBConfiguration(conf);
    String [] cols = dbConf.getInputFieldNames();
    if (null != cols) {
      StringBuilder sb = new StringBuilder();
      boolean first = true;
      for (String col : cols) {
        if (!first) {
          sb.append(",");
        }
        sb.append(col);
        first = false;
      }

      args.add("--columns=" + sb.toString());
    }

    // Specify the delimiters to use.
    int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
        (int) ',');
    int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
        (int) '\n');
    int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
    int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
    boolean encloseRequired = conf.getBoolean(
        MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

    args.add("--fields-terminated-by=0x"
        + Integer.toString(outputFieldDelim, 16));
    args.add("--lines-terminated-by=0x"
        + Integer.toString(outputRecordDelim, 16));
    if (0 != enclosedBy) {
      if (encloseRequired) {
        args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
      } else {
        args.add("--fields-optionally-enclosed-by=0x"
            + Integer.toString(enclosedBy, 16));
      }
    }

    if (0 != escapedBy) {
      args.add("--fields-escaped-by=0x" + Integer.toString(escapedBy, 16));
    }

    // These two arguments are positional and must be last.
    args.add(databaseName);
    args.add(filename);

    // Begin the export in an external process.
    LOG.debug("Starting mysqlimport with arguments:");
    for (String arg : args) {
      LOG.debug("  " + arg);
    }

    // Actually start mysqlimport.
    mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));

    // Log everything it writes to stderr.
    // Ignore anything on stdout.
    this.outSink = new NullAsyncSink();
    this.outSink.processStream(mysqlImportProcess.getInputStream());

    this.errSink = new LoggingAsyncSink(LOG);
    this.errSink.processStream(mysqlImportProcess.getErrorStream());

    // Open the named FIFO after starting mysqlimport.
    this.importStream = new BufferedOutputStream(
        new FileOutputStream(fifoFile));

    // At this point, mysqlimport is running and hooked up to our FIFO.
    // The mapper just needs to populate it with data.

    this.bytesWritten = 0;
  }