in hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java [455:602]
protected static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
Job job = null;
boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Admin admin = connection.getAdmin()) {
// Support non-XML supported characters
// by re-encoding the passed separator as a Base64 string.
String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
if (actualSeparator != null) {
conf.set(SEPARATOR_CONF_KEY,
Bytes.toString(Base64.getEncoder().encode(Bytes.toBytes(actualSeparator))));
}
// See if a non-default Mapper was set
String mapperClassName = conf.get(MAPPER_CONF_KEY);
Class mapperClass =
mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;
TableName tableName = TableName.valueOf(args[0]);
Path inputDir = new Path(args[1]);
String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName.getNameAsString());
job = Job.getInstance(conf, jobName);
job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
String fileLoc = conf.get(CREDENTIALS_LOCATION);
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
job.getCredentials().addAll(cred);
}
if (hfileOutPath != null) {
if (!admin.tableExists(tableName)) {
LOG.warn(format("Table '%s' does not exist.", tableName));
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
// TODO: this is backwards. Instead of depending on the existence of a table,
// create a sane splits file for HFileOutputFormat based on data sampling.
createTable(admin, tableName, columns);
if (isDryRun) {
LOG.warn("Dry run: Table will be deleted at end of dry run.");
synchronized (ImportTsv.class) {
DRY_RUN_TABLE_CREATED = true;
}
}
} else {
String errorMsg = format("Table '%s' does not exist and '%s' is set to no.",
tableName, CREATE_TABLE_CONF_KEY);
LOG.error(errorMsg);
throw new TableNotFoundException(errorMsg);
}
}
try (Table table = connection.getTable(tableName);
RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
// if no.strict is false then check column family
if (!noStrict) {
ArrayList<String> unmatchedFamilies = new ArrayList<>();
Set<String> cfSet = getColumnFamilies(columns);
TableDescriptor tDesc = table.getDescriptor();
for (String cf : cfSet) {
if (!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
unmatchedFamilies.add(cf);
}
}
if (unmatchedFamilies.size() > 0) {
ArrayList<String> familyNames = new ArrayList<>();
for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) {
familyNames.add(family.getNameAsString());
}
String msg = "Column Families " + unmatchedFamilies + " specified in "
+ COLUMNS_CONF_KEY + " does not match with any of the table " + tableName
+ " column families " + familyNames + ".\n"
+ "To disable column family check, use -D" + NO_STRICT_COL_FAMILY + "=true.\n";
usage(msg);
System.exit(-1);
}
}
if (mapperClass.equals(TsvImporterTextMapper.class)) {
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TextSortReducer.class);
} else {
job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class);
job.setReducerClass(PutSortReducer.class);
}
if (!isDryRun) {
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
regionLocator);
}
}
} else {
if (!admin.tableExists(tableName)) {
String errorMsg = format("Table '%s' does not exist.", tableName);
LOG.error(errorMsg);
throw new TableNotFoundException(errorMsg);
}
try (Table table = connection.getTable(tableName)) {
ArrayList<String> unmatchedFamilies = new ArrayList<>();
Set<String> cfSet = getColumnFamilies(columns);
TableDescriptor tDesc = table.getDescriptor();
for (String cf : cfSet) {
if (!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
unmatchedFamilies.add(cf);
}
}
if (unmatchedFamilies.size() > 0) {
String noSuchColumnFamiliesMsg =
format("Column families: %s do not exist.", unmatchedFamilies);
LOG.error(noSuchColumnFamiliesMsg);
throw new NoSuchColumnFamilyException(noSuchColumnFamiliesMsg);
}
}
if (mapperClass.equals(TsvImporterTextMapper.class)) {
usage(TsvImporterTextMapper.class.toString()
+ " should not be used for non bulkloading case. use "
+ TsvImporterMapper.class.toString() + " or custom mapper whose value type is Put.");
System.exit(-1);
}
if (!isDryRun) {
// No reducers. Just write straight to table. Call initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
}
job.setNumReduceTasks(0);
}
if (isDryRun) {
job.setOutputFormatClass(NullOutputFormat.class);
job.getConfiguration().setStrings("io.serializations",
job.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(),
ResultSerialization.class.getName(), CellSerialization.class.getName());
}
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Function.class /*
* Guava used by
* TsvParser
*/);
}
}
return job;
}