public void configureHCat()

in odps-sqoop/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java [271:444]


  public void configureHCat(final SqoopOptions opts, final Job job,
    final ConnManager connMgr, final String dbTable,
    final Configuration config) throws IOException {
    if (configured) {
      LOG.info("Ignoring configuration request for HCatalog info");
      return;
    }
    options = opts;
    checkHomeDirs(opts);
    connManager = connMgr;
    dbTableName = dbTable;
    configuration = config;
    hCatJob = job;
    hCatDatabaseName = options.getHCatDatabaseName() != null ? options
      .getHCatDatabaseName() : DEFHCATDB;
    hCatDatabaseName = hCatDatabaseName.toLowerCase();

    String optHCTabName = options.getHCatTableName();
    hCatTableName = optHCTabName.toLowerCase();

    if (!hCatTableName.equals(optHCTabName)) {
      LOG.warn("Provided HCatalog table name " + optHCTabName
        + " will be mapped to  " + hCatTableName);
    }

    StringBuilder sb = new StringBuilder();
    sb.append(hCatDatabaseName);
    sb.append('.').append(hCatTableName);
    hCatQualifiedTableName = sb.toString();

    String principalID = System
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
    if (principalID != null) {
      configuration.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
    }
    hCatStaticPartitionKeys = new ArrayList<String>();
    hCatStaticPartitionValues = new ArrayList<String>();
    String partKeysString = options.getHCatalogPartitionKeys();
    String partKeysVals = options.getHCatalogPartitionValues();
    // Already validated
    if (partKeysString != null) {
      String[] keys = partKeysString.split(",");
      for (int i = 0; i < keys.length; ++i) {
        String k = keys[i].trim();
        hCatStaticPartitionKeys.add(k);
      }
      String[] vals = partKeysVals.split(",");
      for (int i = 0; i < vals.length; ++i) {
        String v = vals[i].trim();
        hCatStaticPartitionValues.add(v);
      }
    } else {
      partKeysString = options.getHivePartitionKey();
      if (partKeysString != null) {
        hCatStaticPartitionKeys.add(partKeysString);
      }
      partKeysVals = options.getHivePartitionValue();
      hCatStaticPartitionValues.add(partKeysVals);

    }
    Properties userMapping = options.getMapColumnHive();
    userHiveMapping = new LCKeyMap<String>();
    for (Object o : userMapping.keySet()) {
      String v = (String) userMapping.get(o);
      userHiveMapping.put((String) o, v);
    }
    // Get the partition key filter if needed
    Map<String, String> filterMap = getHCatSPFilterMap();
    String filterStr = getHCatSPFilterStr();
    if (connManager instanceof HdfsManager) {
      String[] colNames = options.getColumns();
      dbColumnNames = new String[colNames.length];

      for (int i = 0; i < colNames.length; ++i) {
        dbColumnNames[i] = colNames[i].toLowerCase();
      }
    } else {
      initDBColumnInfo();
    }
    if (options.doCreateHCatalogTable()) {
      LOG.info("Creating HCatalog table " + hCatQualifiedTableName
        + " for import");
      createHCatTable();
    }
    // For serializing the schema to conf
    HCatInputFormat hif = HCatInputFormat.setInput(hCatJob, hCatDatabaseName,
      hCatTableName);
    // For serializing the schema to conf
    if (filterStr != null) {
      LOG.info("Setting hCatInputFormat filter to " + filterStr);
      hif.setFilter(filterStr);
    }

    hCatFullTableSchema = HCatInputFormat.getTableSchema(configuration);
    hCatFullTableSchemaFieldNames = hCatFullTableSchema.getFieldNames();

    LOG.info("HCatalog full table schema fields = "
      + Arrays.toString(hCatFullTableSchema.getFieldNames().toArray()));

    if (filterMap != null) {
      LOG.info("Setting hCatOutputFormat filter to " + filterStr);
    }

    HCatOutputFormat.setOutput(hCatJob,
      OutputJobInfo.create(hCatDatabaseName, hCatTableName, filterMap));
    hCatOutputSchema = HCatOutputFormat.getTableSchema(configuration);
    List<HCatFieldSchema> hCatPartitionSchemaFields =
      new ArrayList<HCatFieldSchema>();
    int totalFieldsCount = hCatFullTableSchema.size();
    int dataFieldsCount = hCatOutputSchema.size();
    if (totalFieldsCount > dataFieldsCount) {
      for (int i = dataFieldsCount; i < totalFieldsCount; ++i) {
        hCatPartitionSchemaFields.add(hCatFullTableSchema.get(i));
      }
    }

    hCatPartitionSchema = new HCatSchema(hCatPartitionSchemaFields);
    for (HCatFieldSchema hfs : hCatPartitionSchemaFields) {
      if (hfs.getType() != HCatFieldSchema.Type.STRING) {
        throw new IOException("The table provided "
          + getQualifiedHCatTableName()
          + " uses unsupported  partitioning key type  for column "
          + hfs.getName() + " : " + hfs.getTypeString() + ".  Only string "
          + "fields are allowed in partition columns in HCatalog");
      }

    }
    LOG.info("HCatalog table partitioning key fields = "
      + Arrays.toString(hCatPartitionSchema.getFieldNames().toArray()));

    List<HCatFieldSchema> outputFieldList = new ArrayList<HCatFieldSchema>();
    for (String col : dbColumnNames) {
      try {
        HCatFieldSchema hfs = hCatFullTableSchema.get(col);
        if (hfs == null) {
          throw new IOException("Database column " + col + " not found in "
              + " hcatalog table.");
        }
      } catch (Exception e) {
        throw new IOException("Caught Exception checking database column " + col + " in "
            + " hcatalog table.", e);
      }
      boolean skip=false;
      if (hCatStaticPartitionKeys != null) {
        for (String key : hCatStaticPartitionKeys) {
          if (col.equals(key)) {
            skip=true;
            break;
          }
        }
      }
      if (skip) {
        continue;
      }
      outputFieldList.add(hCatFullTableSchema.get(col));
    }

    projectedSchema = new HCatSchema(outputFieldList);

    LOG.info("HCatalog projected schema fields = "
      + Arrays.toString(projectedSchema.getFieldNames().toArray()));

    validateStaticPartitionKey();
    validateHCatTableFieldTypes();

    HCatOutputFormat.setSchema(configuration, hCatFullTableSchema);

    addJars(hCatJob, options);
    config.setBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP,
      Boolean.getBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP));
    config.setBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP,
      Boolean.getBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP));
    configured = true;
  }