public static String generateCreateTableDDL()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java [120:290]


  public static String generateCreateTableDDL(Schema schema,
      String tblName,
      String tblLocation,
      Optional<String> optionalDbName,
      Optional<Map<String, String>> optionalPartitionDDLInfo,
      Optional<List<String>> optionalClusterInfo,
      Optional<Map<String, COLUMN_SORT_ORDER>> optionalSortOrderInfo,
      Optional<Integer> optionalNumOfBuckets,
      Optional<String> optionalRowFormatSerde,
      Optional<String> optionalInputFormat,
      Optional<String> optionalOutputFormat,
      Properties tableProperties,
      boolean isEvolutionEnabled,
      boolean casePreserved,
      Optional<Table> destinationTableMeta,
      Map<String, String> hiveColumns) {

    Preconditions.checkNotNull(schema);
    Preconditions.checkArgument(StringUtils.isNotBlank(tblName));
    Preconditions.checkArgument(StringUtils.isNotBlank(tblLocation));

    String dbName = optionalDbName.isPresent() ? optionalDbName.get() : DEFAULT_DB_NAME;
    String rowFormatSerde = optionalRowFormatSerde.isPresent() ? optionalRowFormatSerde.get() : DEFAULT_ROW_FORMAT_SERDE;
    String inputFormat = optionalInputFormat.isPresent() ? optionalInputFormat.get() : DEFAULT_ORC_INPUT_FORMAT;
    String outputFormat = optionalOutputFormat.isPresent() ? optionalOutputFormat.get() : DEFAULT_ORC_OUTPUT_FORMAT;
    tableProperties = getTableProperties(tableProperties);

    // Start building Hive DDL
    // Refer to Hive DDL manual for explanation of clauses:
    // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/TruncateTable
    StringBuilder ddl = new StringBuilder();

    // Create statement
    ddl.append(String.format("CREATE EXTERNAL TABLE IF NOT EXISTS `%s`.`%s` ", dbName, tblName));
    // .. open bracket for CREATE
    ddl.append("( \n");

    // 1. If evolution is enabled, and destination table does not exists
    //    .. use columns from new schema
    //    (evolution does not matter if its new destination table)
    // 2. If evolution is enabled, and destination table does exists
    //    .. use columns from new schema
    //    (alter table will be used before moving data from staging to final table)
    // 3. If evolution is disabled, and destination table does not exists
    //    .. use columns from new schema
    //    (evolution does not matter if its new destination table)
    // 4. If evolution is disabled, and destination table does exists
    //    .. use columns from destination schema
    // Make sure the schema attribute will be updated in source-of-truth attribute.
    // Or fall back to default attribute-pair used in Hive for ORC format.
    if (tableProperties.containsKey(SCHEMA_SOURCE_OF_TRUTH)) {
      tableProperties.setProperty(tableProperties.getProperty(SCHEMA_SOURCE_OF_TRUTH), sanitizeSchemaString(schema.toString()));
      tableProperties.remove(SCHEMA_SOURCE_OF_TRUTH);
    }

    if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
      log.info("Generating DDL using source schema");
      ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName));
      if (casePreserved) {
        try {
          Pair<String, String> orcSchemaProps = HiveConverterUtils.getORCSchemaPropsFromAvroSchema(schema);
          tableProperties.setProperty("columns", orcSchemaProps.getLeft());
          tableProperties.setProperty("columns.types", orcSchemaProps.getRight());
        } catch (SerDeException e) {
          log.error("Cannot generate add partition DDL due to ", e);
          throw new RuntimeException(e);
        }
      }
    } else {
      log.info("Generating DDL using destination schema");
      ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get()));
    }

    // .. close bracket for CREATE
    ddl.append(") \n");

    // Partition info
    if (optionalPartitionDDLInfo.isPresent() && optionalPartitionDDLInfo.get().size() > 0) {
      ddl.append("PARTITIONED BY ( ");
      boolean isFirst = true;
      Map<String, String> partitionInfoMap = optionalPartitionDDLInfo.get();
      for (Map.Entry<String, String> partitionInfo : partitionInfoMap.entrySet()) {
        if (isFirst) {
          isFirst = false;
        } else {
          ddl.append(", ");
        }
        ddl.append(String.format("`%s` %s", partitionInfo.getKey(), partitionInfo.getValue()));
      }
      ddl.append(" ) \n");
    }

    if (optionalClusterInfo.isPresent()) {
      if (!optionalNumOfBuckets.isPresent()) {
        throw new IllegalArgumentException(
            (String.format("CLUSTERED BY requested, but no NUM_BUCKETS specified for table %s.%s", dbName, tblName)));
      }
      ddl.append("CLUSTERED BY ( ");
      boolean isFirst = true;
      for (String clusterByCol : optionalClusterInfo.get()) {
        if (!hiveColumns.containsKey(clusterByCol)) {
          throw new IllegalArgumentException(String.format("Requested CLUSTERED BY column: %s "
              + "is not present in schema for table %s.%s", clusterByCol, dbName, tblName));
        }
        if (isFirst) {
          isFirst = false;
        } else {
          ddl.append(", ");
        }
        ddl.append(String.format("`%s`", clusterByCol));
      }
      ddl.append(" ) ");

      if (optionalSortOrderInfo.isPresent() && optionalSortOrderInfo.get().size() > 0) {
        Map<String, COLUMN_SORT_ORDER> sortOrderInfoMap = optionalSortOrderInfo.get();
        ddl.append("SORTED BY ( ");
        isFirst = true;
        for (Map.Entry<String, COLUMN_SORT_ORDER> sortOrderInfo : sortOrderInfoMap.entrySet()){
          if (!hiveColumns.containsKey(sortOrderInfo.getKey())) {
            throw new IllegalArgumentException(String.format(
                "Requested SORTED BY column: %s " + "is not present in schema for table %s.%s", sortOrderInfo.getKey(),
                dbName, tblName));
          }
          if (isFirst) {
            isFirst = false;
          } else {
            ddl.append(", ");
          }
          ddl.append(String.format("`%s` %s", sortOrderInfo.getKey(), sortOrderInfo.getValue()));
        }
        ddl.append(" ) ");
      }
      ddl.append(String.format(" INTO %s BUCKETS %n", optionalNumOfBuckets.get()));
    } else {
      if (optionalSortOrderInfo.isPresent()) {
        throw new IllegalArgumentException(
            String.format("SORTED BY requested, but no CLUSTERED BY specified for table %s.%s", dbName, tblName));
      }
    }

    // Field Terminal
    ddl.append("ROW FORMAT SERDE \n");
    ddl.append(String.format("  '%s' %n", rowFormatSerde));

    // Stored as ORC
    ddl.append("STORED AS INPUTFORMAT \n");
    ddl.append(String.format("  '%s' %n", inputFormat));
    ddl.append("OUTPUTFORMAT \n");
    ddl.append(String.format("  '%s' %n", outputFormat));

    // Location
    ddl.append("LOCATION \n");
    ddl.append(String.format("  '%s' %n", tblLocation));

    // Table properties
    if (null != tableProperties && tableProperties.size() > 0) {
      ddl.append("TBLPROPERTIES ( \n");
      boolean isFirst = true;
      for (String property : tableProperties.stringPropertyNames()) {
        if (isFirst) {
          isFirst = false;
        } else {
          ddl.append(", \n");
        }
        ddl.append(String.format("  '%s'='%s'", property, tableProperties.getProperty(property)));
      }
      ddl.append(") \n");
    }

    return ddl.toString();
  }