in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java [120:290]
public static String generateCreateTableDDL(Schema schema,
String tblName,
String tblLocation,
Optional<String> optionalDbName,
Optional<Map<String, String>> optionalPartitionDDLInfo,
Optional<List<String>> optionalClusterInfo,
Optional<Map<String, COLUMN_SORT_ORDER>> optionalSortOrderInfo,
Optional<Integer> optionalNumOfBuckets,
Optional<String> optionalRowFormatSerde,
Optional<String> optionalInputFormat,
Optional<String> optionalOutputFormat,
Properties tableProperties,
boolean isEvolutionEnabled,
boolean casePreserved,
Optional<Table> destinationTableMeta,
Map<String, String> hiveColumns) {
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(StringUtils.isNotBlank(tblName));
Preconditions.checkArgument(StringUtils.isNotBlank(tblLocation));
String dbName = optionalDbName.isPresent() ? optionalDbName.get() : DEFAULT_DB_NAME;
String rowFormatSerde = optionalRowFormatSerde.isPresent() ? optionalRowFormatSerde.get() : DEFAULT_ROW_FORMAT_SERDE;
String inputFormat = optionalInputFormat.isPresent() ? optionalInputFormat.get() : DEFAULT_ORC_INPUT_FORMAT;
String outputFormat = optionalOutputFormat.isPresent() ? optionalOutputFormat.get() : DEFAULT_ORC_OUTPUT_FORMAT;
tableProperties = getTableProperties(tableProperties);
// Start building Hive DDL
// Refer to Hive DDL manual for explanation of clauses:
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/TruncateTable
StringBuilder ddl = new StringBuilder();
// Create statement
ddl.append(String.format("CREATE EXTERNAL TABLE IF NOT EXISTS `%s`.`%s` ", dbName, tblName));
// .. open bracket for CREATE
ddl.append("( \n");
// 1. If evolution is enabled, and destination table does not exists
// .. use columns from new schema
// (evolution does not matter if its new destination table)
// 2. If evolution is enabled, and destination table does exists
// .. use columns from new schema
// (alter table will be used before moving data from staging to final table)
// 3. If evolution is disabled, and destination table does not exists
// .. use columns from new schema
// (evolution does not matter if its new destination table)
// 4. If evolution is disabled, and destination table does exists
// .. use columns from destination schema
// Make sure the schema attribute will be updated in source-of-truth attribute.
// Or fall back to default attribute-pair used in Hive for ORC format.
if (tableProperties.containsKey(SCHEMA_SOURCE_OF_TRUTH)) {
tableProperties.setProperty(tableProperties.getProperty(SCHEMA_SOURCE_OF_TRUTH), sanitizeSchemaString(schema.toString()));
tableProperties.remove(SCHEMA_SOURCE_OF_TRUTH);
}
if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
log.info("Generating DDL using source schema");
ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName));
if (casePreserved) {
try {
Pair<String, String> orcSchemaProps = HiveConverterUtils.getORCSchemaPropsFromAvroSchema(schema);
tableProperties.setProperty("columns", orcSchemaProps.getLeft());
tableProperties.setProperty("columns.types", orcSchemaProps.getRight());
} catch (SerDeException e) {
log.error("Cannot generate add partition DDL due to ", e);
throw new RuntimeException(e);
}
}
} else {
log.info("Generating DDL using destination schema");
ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get()));
}
// .. close bracket for CREATE
ddl.append(") \n");
// Partition info
if (optionalPartitionDDLInfo.isPresent() && optionalPartitionDDLInfo.get().size() > 0) {
ddl.append("PARTITIONED BY ( ");
boolean isFirst = true;
Map<String, String> partitionInfoMap = optionalPartitionDDLInfo.get();
for (Map.Entry<String, String> partitionInfo : partitionInfoMap.entrySet()) {
if (isFirst) {
isFirst = false;
} else {
ddl.append(", ");
}
ddl.append(String.format("`%s` %s", partitionInfo.getKey(), partitionInfo.getValue()));
}
ddl.append(" ) \n");
}
if (optionalClusterInfo.isPresent()) {
if (!optionalNumOfBuckets.isPresent()) {
throw new IllegalArgumentException(
(String.format("CLUSTERED BY requested, but no NUM_BUCKETS specified for table %s.%s", dbName, tblName)));
}
ddl.append("CLUSTERED BY ( ");
boolean isFirst = true;
for (String clusterByCol : optionalClusterInfo.get()) {
if (!hiveColumns.containsKey(clusterByCol)) {
throw new IllegalArgumentException(String.format("Requested CLUSTERED BY column: %s "
+ "is not present in schema for table %s.%s", clusterByCol, dbName, tblName));
}
if (isFirst) {
isFirst = false;
} else {
ddl.append(", ");
}
ddl.append(String.format("`%s`", clusterByCol));
}
ddl.append(" ) ");
if (optionalSortOrderInfo.isPresent() && optionalSortOrderInfo.get().size() > 0) {
Map<String, COLUMN_SORT_ORDER> sortOrderInfoMap = optionalSortOrderInfo.get();
ddl.append("SORTED BY ( ");
isFirst = true;
for (Map.Entry<String, COLUMN_SORT_ORDER> sortOrderInfo : sortOrderInfoMap.entrySet()){
if (!hiveColumns.containsKey(sortOrderInfo.getKey())) {
throw new IllegalArgumentException(String.format(
"Requested SORTED BY column: %s " + "is not present in schema for table %s.%s", sortOrderInfo.getKey(),
dbName, tblName));
}
if (isFirst) {
isFirst = false;
} else {
ddl.append(", ");
}
ddl.append(String.format("`%s` %s", sortOrderInfo.getKey(), sortOrderInfo.getValue()));
}
ddl.append(" ) ");
}
ddl.append(String.format(" INTO %s BUCKETS %n", optionalNumOfBuckets.get()));
} else {
if (optionalSortOrderInfo.isPresent()) {
throw new IllegalArgumentException(
String.format("SORTED BY requested, but no CLUSTERED BY specified for table %s.%s", dbName, tblName));
}
}
// Field Terminal
ddl.append("ROW FORMAT SERDE \n");
ddl.append(String.format(" '%s' %n", rowFormatSerde));
// Stored as ORC
ddl.append("STORED AS INPUTFORMAT \n");
ddl.append(String.format(" '%s' %n", inputFormat));
ddl.append("OUTPUTFORMAT \n");
ddl.append(String.format(" '%s' %n", outputFormat));
// Location
ddl.append("LOCATION \n");
ddl.append(String.format(" '%s' %n", tblLocation));
// Table properties
if (null != tableProperties && tableProperties.size() > 0) {
ddl.append("TBLPROPERTIES ( \n");
boolean isFirst = true;
for (String property : tableProperties.stringPropertyNames()) {
if (isFirst) {
isFirst = false;
} else {
ddl.append(", \n");
}
ddl.append(String.format(" '%s'='%s'", property, tableProperties.getProperty(property)));
}
ddl.append(") \n");
}
return ddl.toString();
}