public static String generateTableMappingDML()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java [485:645]


  public static String generateTableMappingDML(Schema inputAvroSchema,
      Schema outputOrcSchema,
      String inputTblName,
      String outputTblName,
      Optional<String> optionalInputDbName,
      Optional<String> optionalOutputDbName,
      Optional<Map<String, String>> optionalPartitionDMLInfo,
      Optional<Boolean> optionalOverwriteTable,
      Optional<Boolean> optionalCreateIfNotExists,
      boolean isEvolutionEnabled,
      Optional<Table> destinationTableMeta,
      Optional<Integer> rowLimit) {
    Preconditions.checkNotNull(inputAvroSchema);
    Preconditions.checkNotNull(outputOrcSchema);
    Preconditions.checkArgument(StringUtils.isNotBlank(inputTblName));
    Preconditions.checkArgument(StringUtils.isNotBlank(outputTblName));

    String inputDbName = optionalInputDbName.isPresent() ? optionalInputDbName.get() : DEFAULT_DB_NAME;
    String outputDbName = optionalOutputDbName.isPresent() ? optionalOutputDbName.get() : DEFAULT_DB_NAME;
    boolean shouldOverwriteTable = optionalOverwriteTable.isPresent() ? optionalOverwriteTable.get() : true;
    boolean shouldCreateIfNotExists = optionalCreateIfNotExists.isPresent() ? optionalCreateIfNotExists.get() : false;

    log.debug("Input Schema: " + inputAvroSchema.toString());
    log.debug("Output Schema: " + outputOrcSchema.toString());

    // Start building Hive DML
    // Refer to Hive DDL manual for explanation of clauses:
    // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    StringBuilder dmlQuery = new StringBuilder();

    // Insert query
    if (shouldOverwriteTable) {
      dmlQuery.append(String.format("INSERT OVERWRITE TABLE `%s`.`%s` %n", outputDbName, outputTblName));
    } else {
      dmlQuery.append(String.format("INSERT INTO TABLE `%s`.`%s` %n", outputDbName, outputTblName));
    }

    // Partition details
    if (optionalPartitionDMLInfo.isPresent()) {
      if (optionalPartitionDMLInfo.get().size()  > 0) {
        dmlQuery.append("PARTITION (");
        boolean isFirstPartitionSpec = true;
        for (Map.Entry<String, String> partition : optionalPartitionDMLInfo.get().entrySet()) {
          if (isFirstPartitionSpec) {
            isFirstPartitionSpec = false;
          } else {
            dmlQuery.append(", ");
          }
          dmlQuery.append(String.format("`%s`='%s'", partition.getKey(), partition.getValue()));
        }
        dmlQuery.append(") \n");
      }
    }

    // If not exists
    if (shouldCreateIfNotExists) {
      dmlQuery.append(" IF NOT EXISTS \n");
    }

    // Select query
    dmlQuery.append("SELECT \n");

    // 1. If evolution is enabled, and destination table does not exists
    //    .. use columns from new schema
    //    (evolution does not matter if its new destination table)
    // 2. If evolution is enabled, and destination table does exists
    //    .. use columns from new schema
    //    (alter table will be used before moving data from staging to final table)
    // 3. If evolution is disabled, and destination table does not exists
    //    .. use columns from new schema
    //    (evolution does not matter if its new destination table)
    // 4. If evolution is disabled, and destination table does exists
    //    .. use columns from destination schema
    if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
      log.info("Generating DML using source schema");
      boolean isFirst = true;
      List<Schema.Field> fieldList = outputOrcSchema.getFields();
      for (Schema.Field field : fieldList) {
        String flattenSource = field.getProp("flatten_source");
        String colName;
        if (StringUtils.isNotBlank(flattenSource)) {
          colName = flattenSource;
        } else {
          colName = field.name();
        }
        // Escape the column name
        colName = colName.replaceAll("\\.", "`.`");

        if (isFirst) {
          isFirst = false;
        } else {
          dmlQuery.append(", \n");
        }
        dmlQuery.append(String.format("  `%s`", colName));
      }
    } else {
      log.info("Generating DML using destination schema");
      boolean isFirst = true;
      List<FieldSchema> fieldList = destinationTableMeta.get().getSd().getCols();
      for (FieldSchema field : fieldList) {
        String colName = StringUtils.EMPTY;
        if (field.isSetComment() && field.getComment().startsWith("from flatten_source ")) {
          // Retrieve from column (flatten_source) from comment
          colName = field.getComment().replaceAll("from flatten_source ", "").trim();
        } else {
          // Or else find field in flattened schema
          List<Schema.Field> evolvedFieldList = outputOrcSchema.getFields();
          for (Schema.Field evolvedField : evolvedFieldList) {
            if (evolvedField.name().equalsIgnoreCase(field.getName())) {
              String flattenSource = evolvedField.getProp("flatten_source");
              if (StringUtils.isNotBlank(flattenSource)) {
                colName = flattenSource;
              } else {
                colName = evolvedField.name();
              }
              break;
            }
          }
        }
        // Escape the column name
        colName = colName.replaceAll("\\.", "`.`");

        // colName can be blank if it is deleted in new evolved schema, so we shouldn't try to fetch it
        if (StringUtils.isNotBlank(colName)) {
          if (isFirst) {
            isFirst = false;
          } else {
            dmlQuery.append(", \n");
          }
          dmlQuery.append(String.format("  `%s`", colName));
        }
      }
    }

    dmlQuery.append(String.format(" %n FROM `%s`.`%s` ", inputDbName, inputTblName));

    // Partition details
    if (optionalPartitionDMLInfo.isPresent()) {
      if (optionalPartitionDMLInfo.get().size() > 0) {
        dmlQuery.append("WHERE ");
        boolean isFirstPartitionSpec = true;
        for (Map.Entry<String, String> partition : optionalPartitionDMLInfo.get().entrySet()) {
          if (isFirstPartitionSpec) {
            isFirstPartitionSpec = false;
          } else {
            dmlQuery.append(" AND ");
          }
          dmlQuery.append(String.format("`%s`='%s'",
              partition.getKey(), partition.getValue()));
        }
        dmlQuery.append(" \n");
      }
    }

    // Limit clause
    if (rowLimit.isPresent()) {
      dmlQuery.append(String.format("LIMIT %s", rowLimit.get()));
    }

    return dmlQuery.toString();
  }