in odps-sqoop/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java [271:444]
public void configureHCat(final SqoopOptions opts, final Job job,
final ConnManager connMgr, final String dbTable,
final Configuration config) throws IOException {
if (configured) {
LOG.info("Ignoring configuration request for HCatalog info");
return;
}
options = opts;
checkHomeDirs(opts);
connManager = connMgr;
dbTableName = dbTable;
configuration = config;
hCatJob = job;
hCatDatabaseName = options.getHCatDatabaseName() != null ? options
.getHCatDatabaseName() : DEFHCATDB;
hCatDatabaseName = hCatDatabaseName.toLowerCase();
String optHCTabName = options.getHCatTableName();
hCatTableName = optHCTabName.toLowerCase();
if (!hCatTableName.equals(optHCTabName)) {
LOG.warn("Provided HCatalog table name " + optHCTabName
+ " will be mapped to " + hCatTableName);
}
StringBuilder sb = new StringBuilder();
sb.append(hCatDatabaseName);
sb.append('.').append(hCatTableName);
hCatQualifiedTableName = sb.toString();
String principalID = System
.getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
if (principalID != null) {
configuration.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
}
hCatStaticPartitionKeys = new ArrayList<String>();
hCatStaticPartitionValues = new ArrayList<String>();
String partKeysString = options.getHCatalogPartitionKeys();
String partKeysVals = options.getHCatalogPartitionValues();
// Already validated
if (partKeysString != null) {
String[] keys = partKeysString.split(",");
for (int i = 0; i < keys.length; ++i) {
String k = keys[i].trim();
hCatStaticPartitionKeys.add(k);
}
String[] vals = partKeysVals.split(",");
for (int i = 0; i < vals.length; ++i) {
String v = vals[i].trim();
hCatStaticPartitionValues.add(v);
}
} else {
partKeysString = options.getHivePartitionKey();
if (partKeysString != null) {
hCatStaticPartitionKeys.add(partKeysString);
}
partKeysVals = options.getHivePartitionValue();
hCatStaticPartitionValues.add(partKeysVals);
}
Properties userMapping = options.getMapColumnHive();
userHiveMapping = new LCKeyMap<String>();
for (Object o : userMapping.keySet()) {
String v = (String) userMapping.get(o);
userHiveMapping.put((String) o, v);
}
// Get the partition key filter if needed
Map<String, String> filterMap = getHCatSPFilterMap();
String filterStr = getHCatSPFilterStr();
if (connManager instanceof HdfsManager) {
String[] colNames = options.getColumns();
dbColumnNames = new String[colNames.length];
for (int i = 0; i < colNames.length; ++i) {
dbColumnNames[i] = colNames[i].toLowerCase();
}
} else {
initDBColumnInfo();
}
if (options.doCreateHCatalogTable()) {
LOG.info("Creating HCatalog table " + hCatQualifiedTableName
+ " for import");
createHCatTable();
}
// For serializing the schema to conf
HCatInputFormat hif = HCatInputFormat.setInput(hCatJob, hCatDatabaseName,
hCatTableName);
// For serializing the schema to conf
if (filterStr != null) {
LOG.info("Setting hCatInputFormat filter to " + filterStr);
hif.setFilter(filterStr);
}
hCatFullTableSchema = HCatInputFormat.getTableSchema(configuration);
hCatFullTableSchemaFieldNames = hCatFullTableSchema.getFieldNames();
LOG.info("HCatalog full table schema fields = "
+ Arrays.toString(hCatFullTableSchema.getFieldNames().toArray()));
if (filterMap != null) {
LOG.info("Setting hCatOutputFormat filter to " + filterStr);
}
HCatOutputFormat.setOutput(hCatJob,
OutputJobInfo.create(hCatDatabaseName, hCatTableName, filterMap));
hCatOutputSchema = HCatOutputFormat.getTableSchema(configuration);
List<HCatFieldSchema> hCatPartitionSchemaFields =
new ArrayList<HCatFieldSchema>();
int totalFieldsCount = hCatFullTableSchema.size();
int dataFieldsCount = hCatOutputSchema.size();
if (totalFieldsCount > dataFieldsCount) {
for (int i = dataFieldsCount; i < totalFieldsCount; ++i) {
hCatPartitionSchemaFields.add(hCatFullTableSchema.get(i));
}
}
hCatPartitionSchema = new HCatSchema(hCatPartitionSchemaFields);
for (HCatFieldSchema hfs : hCatPartitionSchemaFields) {
if (hfs.getType() != HCatFieldSchema.Type.STRING) {
throw new IOException("The table provided "
+ getQualifiedHCatTableName()
+ " uses unsupported partitioning key type for column "
+ hfs.getName() + " : " + hfs.getTypeString() + ". Only string "
+ "fields are allowed in partition columns in HCatalog");
}
}
LOG.info("HCatalog table partitioning key fields = "
+ Arrays.toString(hCatPartitionSchema.getFieldNames().toArray()));
List<HCatFieldSchema> outputFieldList = new ArrayList<HCatFieldSchema>();
for (String col : dbColumnNames) {
try {
HCatFieldSchema hfs = hCatFullTableSchema.get(col);
if (hfs == null) {
throw new IOException("Database column " + col + " not found in "
+ " hcatalog table.");
}
} catch (Exception e) {
throw new IOException("Caught Exception checking database column " + col + " in "
+ " hcatalog table.", e);
}
boolean skip=false;
if (hCatStaticPartitionKeys != null) {
for (String key : hCatStaticPartitionKeys) {
if (col.equals(key)) {
skip=true;
break;
}
}
}
if (skip) {
continue;
}
outputFieldList.add(hCatFullTableSchema.get(col));
}
projectedSchema = new HCatSchema(outputFieldList);
LOG.info("HCatalog projected schema fields = "
+ Arrays.toString(projectedSchema.getFieldNames().toArray()));
validateStaticPartitionKey();
validateHCatTableFieldTypes();
HCatOutputFormat.setSchema(configuration, hCatFullTableSchema);
addJars(hCatJob, options);
config.setBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP,
Boolean.getBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP));
config.setBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP,
Boolean.getBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP));
configured = true;
}