in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java [49:189]
public static String generateAvroToHiveColumnMapping(Schema schema, Optional<Map<String, String>> hiveColumns,
boolean topLevel, String datasetName) {
if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException(String
.format("Schema for table must be of type RECORD. Received type: %s for dataset %s", schema.getType(),
datasetName));
}
StringBuilder columns = new StringBuilder();
boolean isFirst;
switch (schema.getType()) {
case RECORD:
isFirst = true;
if (topLevel) {
for (Schema.Field field : schema.getFields()) {
if (isFirst) {
isFirst = false;
} else {
columns.append(", \n");
}
String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
if (hiveColumns.isPresent()) {
hiveColumns.get().put(field.name(), type);
}
String flattenSource = field.getProp("flatten_source");
if (StringUtils.isBlank(flattenSource)) {
flattenSource = field.name();
}
columns
.append(String.format(" `%s` %s COMMENT 'from flatten_source %s'", field.name(), type, flattenSource));
}
} else {
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
for (Schema.Field field : schema.getFields()) {
if (isFirst) {
isFirst = false;
} else {
columns.append(",");
}
String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
columns.append("`").append(field.name()).append("`").append(":").append(type);
}
columns.append(">");
}
break;
case UNION:
Optional<Schema> optionalType = isOfOptionType(schema);
if (optionalType.isPresent()) {
Schema optionalTypeSchema = optionalType.get();
columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false, datasetName));
} else {
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
isFirst = true;
for (Schema unionMember : schema.getTypes()) {
if (Schema.Type.NULL.equals(unionMember.getType())) {
continue;
}
if (isFirst) {
isFirst = false;
} else {
columns.append(",");
}
columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false, datasetName));
}
columns.append(">");
}
break;
case MAP:
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
columns.append("string,")
.append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false, datasetName));
columns.append(">");
break;
case ARRAY:
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false, datasetName));
columns.append(">");
break;
case NULL:
break;
case BYTES:
case DOUBLE:
case ENUM:
case FIXED:
case FLOAT:
case INT:
case LONG:
case STRING:
case BOOLEAN:
// Handling Avro Logical Types which should always sit in leaf-level.
boolean isLogicalTypeSet = false;
try {
String hiveSpecificLogicalType = generateHiveSpecificLogicalType(schema);
if (StringUtils.isNoneEmpty(hiveSpecificLogicalType)) {
isLogicalTypeSet = true;
columns.append(hiveSpecificLogicalType);
break;
}
} catch (AvroSerdeException ae) {
log.error("Failed to generate logical type string for field" + schema.getName() + " due to:", ae);
}
LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
if (logicalType != null) {
switch (logicalType.getName().toLowerCase()) {
case HiveAvroTypeConstants.DATE:
LogicalTypes.Date dateType = (LogicalTypes.Date) logicalType;
dateType.validate(schema);
columns.append("date");
isLogicalTypeSet = true;
break;
case HiveAvroTypeConstants.DECIMAL:
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
decimalType.validate(schema);
columns.append(String.format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale()));
isLogicalTypeSet = true;
break;
case HiveAvroTypeConstants.TIME_MILLIS:
LogicalTypes.TimeMillis timeMillsType = (LogicalTypes.TimeMillis) logicalType;
timeMillsType.validate(schema);
columns.append("timestamp");
isLogicalTypeSet = true;
break;
default:
log.error("Unsupported logical type" + schema.getLogicalType().getName() + ", fallback to physical type");
}
}
if (!isLogicalTypeSet) {
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
}
break;
default:
String exceptionMessage =
String.format("DDL query generation failed for \"%s\" of dataset %s", schema, datasetName);
log.error(exceptionMessage);
throw new AvroRuntimeException(exceptionMessage);
}
return columns.toString();
}