protected static Job createSubmittableJob()

in hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java [455:602]


  protected static Job createSubmittableJob(Configuration conf, String[] args)
    throws IOException, ClassNotFoundException {
    Job job = null;
    boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
    try (Connection connection = ConnectionFactory.createConnection(conf)) {
      try (Admin admin = connection.getAdmin()) {
        // Support non-XML supported characters
        // by re-encoding the passed separator as a Base64 string.
        String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
        if (actualSeparator != null) {
          conf.set(SEPARATOR_CONF_KEY,
            Bytes.toString(Base64.getEncoder().encode(Bytes.toBytes(actualSeparator))));
        }

        // See if a non-default Mapper was set
        String mapperClassName = conf.get(MAPPER_CONF_KEY);
        Class mapperClass =
          mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;

        TableName tableName = TableName.valueOf(args[0]);
        Path inputDir = new Path(args[1]);
        String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName.getNameAsString());
        job = Job.getInstance(conf, jobName);
        job.setJarByClass(mapperClass);
        FileInputFormat.setInputPaths(job, inputDir);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(mapperClass);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
        String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
        if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
          String fileLoc = conf.get(CREDENTIALS_LOCATION);
          Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
          job.getCredentials().addAll(cred);
        }

        if (hfileOutPath != null) {
          if (!admin.tableExists(tableName)) {
            LOG.warn(format("Table '%s' does not exist.", tableName));
            if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
              // TODO: this is backwards. Instead of depending on the existence of a table,
              // create a sane splits file for HFileOutputFormat based on data sampling.
              createTable(admin, tableName, columns);
              if (isDryRun) {
                LOG.warn("Dry run: Table will be deleted at end of dry run.");
                synchronized (ImportTsv.class) {
                  DRY_RUN_TABLE_CREATED = true;
                }
              }
            } else {
              String errorMsg = format("Table '%s' does not exist and '%s' is set to no.",
                tableName, CREATE_TABLE_CONF_KEY);
              LOG.error(errorMsg);
              throw new TableNotFoundException(errorMsg);
            }
          }
          try (Table table = connection.getTable(tableName);
            RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
            boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
            // if no.strict is false then check column family
            if (!noStrict) {
              ArrayList<String> unmatchedFamilies = new ArrayList<>();
              Set<String> cfSet = getColumnFamilies(columns);
              TableDescriptor tDesc = table.getDescriptor();
              for (String cf : cfSet) {
                if (!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
                  unmatchedFamilies.add(cf);
                }
              }
              if (unmatchedFamilies.size() > 0) {
                ArrayList<String> familyNames = new ArrayList<>();
                for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) {
                  familyNames.add(family.getNameAsString());
                }
                String msg = "Column Families " + unmatchedFamilies + " specified in "
                  + COLUMNS_CONF_KEY + " does not match with any of the table " + tableName
                  + " column families " + familyNames + ".\n"
                  + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY + "=true.\n";
                usage(msg);
                System.exit(-1);
              }
            }
            if (mapperClass.equals(TsvImporterTextMapper.class)) {
              job.setMapOutputValueClass(Text.class);
              job.setReducerClass(TextSortReducer.class);
            } else {
              job.setMapOutputValueClass(Put.class);
              job.setCombinerClass(PutCombiner.class);
              job.setReducerClass(PutSortReducer.class);
            }
            if (!isDryRun) {
              Path outputDir = new Path(hfileOutPath);
              FileOutputFormat.setOutputPath(job, outputDir);
              HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
                regionLocator);
            }
          }
        } else {
          if (!admin.tableExists(tableName)) {
            String errorMsg = format("Table '%s' does not exist.", tableName);
            LOG.error(errorMsg);
            throw new TableNotFoundException(errorMsg);
          }
          try (Table table = connection.getTable(tableName)) {
            ArrayList<String> unmatchedFamilies = new ArrayList<>();
            Set<String> cfSet = getColumnFamilies(columns);
            TableDescriptor tDesc = table.getDescriptor();
            for (String cf : cfSet) {
              if (!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
                unmatchedFamilies.add(cf);
              }
            }
            if (unmatchedFamilies.size() > 0) {
              String noSuchColumnFamiliesMsg =
                format("Column families: %s do not exist.", unmatchedFamilies);
              LOG.error(noSuchColumnFamiliesMsg);
              throw new NoSuchColumnFamilyException(noSuchColumnFamiliesMsg);
            }
          }
          if (mapperClass.equals(TsvImporterTextMapper.class)) {
            usage(TsvImporterTextMapper.class.toString()
              + " should not be used for non bulkloading case. use "
              + TsvImporterMapper.class.toString() + " or custom mapper whose value type is Put.");
            System.exit(-1);
          }
          if (!isDryRun) {
            // No reducers. Just write straight to table. Call initTableReducerJob
            // to set up the TableOutputFormat.
            TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
          }
          job.setNumReduceTasks(0);
        }
        if (isDryRun) {
          job.setOutputFormatClass(NullOutputFormat.class);
          job.getConfiguration().setStrings("io.serializations",
            job.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(),
            ResultSerialization.class.getName(), CellSerialization.class.getName());
        }
        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
          org.apache.hbase.thirdparty.com.google.common.base.Function.class /*
                                                                             * Guava used by
                                                                             * TsvParser
                                                                             */);
      }
    }
    return job;
  }