in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java [485:645]
public static String generateTableMappingDML(Schema inputAvroSchema,
Schema outputOrcSchema,
String inputTblName,
String outputTblName,
Optional<String> optionalInputDbName,
Optional<String> optionalOutputDbName,
Optional<Map<String, String>> optionalPartitionDMLInfo,
Optional<Boolean> optionalOverwriteTable,
Optional<Boolean> optionalCreateIfNotExists,
boolean isEvolutionEnabled,
Optional<Table> destinationTableMeta,
Optional<Integer> rowLimit) {
Preconditions.checkNotNull(inputAvroSchema);
Preconditions.checkNotNull(outputOrcSchema);
Preconditions.checkArgument(StringUtils.isNotBlank(inputTblName));
Preconditions.checkArgument(StringUtils.isNotBlank(outputTblName));
String inputDbName = optionalInputDbName.isPresent() ? optionalInputDbName.get() : DEFAULT_DB_NAME;
String outputDbName = optionalOutputDbName.isPresent() ? optionalOutputDbName.get() : DEFAULT_DB_NAME;
boolean shouldOverwriteTable = optionalOverwriteTable.isPresent() ? optionalOverwriteTable.get() : true;
boolean shouldCreateIfNotExists = optionalCreateIfNotExists.isPresent() ? optionalCreateIfNotExists.get() : false;
log.debug("Input Schema: " + inputAvroSchema.toString());
log.debug("Output Schema: " + outputOrcSchema.toString());
// Start building Hive DML
// Refer to Hive DDL manual for explanation of clauses:
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
StringBuilder dmlQuery = new StringBuilder();
// Insert query
if (shouldOverwriteTable) {
dmlQuery.append(String.format("INSERT OVERWRITE TABLE `%s`.`%s` %n", outputDbName, outputTblName));
} else {
dmlQuery.append(String.format("INSERT INTO TABLE `%s`.`%s` %n", outputDbName, outputTblName));
}
// Partition details
if (optionalPartitionDMLInfo.isPresent()) {
if (optionalPartitionDMLInfo.get().size() > 0) {
dmlQuery.append("PARTITION (");
boolean isFirstPartitionSpec = true;
for (Map.Entry<String, String> partition : optionalPartitionDMLInfo.get().entrySet()) {
if (isFirstPartitionSpec) {
isFirstPartitionSpec = false;
} else {
dmlQuery.append(", ");
}
dmlQuery.append(String.format("`%s`='%s'", partition.getKey(), partition.getValue()));
}
dmlQuery.append(") \n");
}
}
// If not exists
if (shouldCreateIfNotExists) {
dmlQuery.append(" IF NOT EXISTS \n");
}
// Select query
dmlQuery.append("SELECT \n");
// 1. If evolution is enabled, and destination table does not exists
// .. use columns from new schema
// (evolution does not matter if its new destination table)
// 2. If evolution is enabled, and destination table does exists
// .. use columns from new schema
// (alter table will be used before moving data from staging to final table)
// 3. If evolution is disabled, and destination table does not exists
// .. use columns from new schema
// (evolution does not matter if its new destination table)
// 4. If evolution is disabled, and destination table does exists
// .. use columns from destination schema
if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
log.info("Generating DML using source schema");
boolean isFirst = true;
List<Schema.Field> fieldList = outputOrcSchema.getFields();
for (Schema.Field field : fieldList) {
String flattenSource = field.getProp("flatten_source");
String colName;
if (StringUtils.isNotBlank(flattenSource)) {
colName = flattenSource;
} else {
colName = field.name();
}
// Escape the column name
colName = colName.replaceAll("\\.", "`.`");
if (isFirst) {
isFirst = false;
} else {
dmlQuery.append(", \n");
}
dmlQuery.append(String.format(" `%s`", colName));
}
} else {
log.info("Generating DML using destination schema");
boolean isFirst = true;
List<FieldSchema> fieldList = destinationTableMeta.get().getSd().getCols();
for (FieldSchema field : fieldList) {
String colName = StringUtils.EMPTY;
if (field.isSetComment() && field.getComment().startsWith("from flatten_source ")) {
// Retrieve from column (flatten_source) from comment
colName = field.getComment().replaceAll("from flatten_source ", "").trim();
} else {
// Or else find field in flattened schema
List<Schema.Field> evolvedFieldList = outputOrcSchema.getFields();
for (Schema.Field evolvedField : evolvedFieldList) {
if (evolvedField.name().equalsIgnoreCase(field.getName())) {
String flattenSource = evolvedField.getProp("flatten_source");
if (StringUtils.isNotBlank(flattenSource)) {
colName = flattenSource;
} else {
colName = evolvedField.name();
}
break;
}
}
}
// Escape the column name
colName = colName.replaceAll("\\.", "`.`");
// colName can be blank if it is deleted in new evolved schema, so we shouldn't try to fetch it
if (StringUtils.isNotBlank(colName)) {
if (isFirst) {
isFirst = false;
} else {
dmlQuery.append(", \n");
}
dmlQuery.append(String.format(" `%s`", colName));
}
}
}
dmlQuery.append(String.format(" %n FROM `%s`.`%s` ", inputDbName, inputTblName));
// Partition details
if (optionalPartitionDMLInfo.isPresent()) {
if (optionalPartitionDMLInfo.get().size() > 0) {
dmlQuery.append("WHERE ");
boolean isFirstPartitionSpec = true;
for (Map.Entry<String, String> partition : optionalPartitionDMLInfo.get().entrySet()) {
if (isFirstPartitionSpec) {
isFirstPartitionSpec = false;
} else {
dmlQuery.append(" AND ");
}
dmlQuery.append(String.format("`%s`='%s'",
partition.getKey(), partition.getValue()));
}
dmlQuery.append(" \n");
}
}
// Limit clause
if (rowLimit.isPresent()) {
dmlQuery.append(String.format("LIMIT %s", rowLimit.get()));
}
return dmlQuery.toString();
}