public static String generateAvroToHiveColumnMapping()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java [49:189]


  public static String generateAvroToHiveColumnMapping(Schema schema, Optional<Map<String, String>> hiveColumns,
      boolean topLevel, String datasetName) {
    if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) {
      throw new IllegalArgumentException(String
          .format("Schema for table must be of type RECORD. Received type: %s for dataset %s", schema.getType(),
              datasetName));
    }

    StringBuilder columns = new StringBuilder();
    boolean isFirst;
    switch (schema.getType()) {
      case RECORD:
        isFirst = true;
        if (topLevel) {
          for (Schema.Field field : schema.getFields()) {
            if (isFirst) {
              isFirst = false;
            } else {
              columns.append(", \n");
            }
            String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
            if (hiveColumns.isPresent()) {
              hiveColumns.get().put(field.name(), type);
            }
            String flattenSource = field.getProp("flatten_source");
            if (StringUtils.isBlank(flattenSource)) {
              flattenSource = field.name();
            }
            columns
                .append(String.format("  `%s` %s COMMENT 'from flatten_source %s'", field.name(), type, flattenSource));
          }
        } else {
          columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
          for (Schema.Field field : schema.getFields()) {
            if (isFirst) {
              isFirst = false;
            } else {
              columns.append(",");
            }
            String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName);
            columns.append("`").append(field.name()).append("`").append(":").append(type);
          }
          columns.append(">");
        }
        break;
      case UNION:
        Optional<Schema> optionalType = isOfOptionType(schema);
        if (optionalType.isPresent()) {
          Schema optionalTypeSchema = optionalType.get();
          columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false, datasetName));
        } else {
          columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
          isFirst = true;
          for (Schema unionMember : schema.getTypes()) {
            if (Schema.Type.NULL.equals(unionMember.getType())) {
              continue;
            }
            if (isFirst) {
              isFirst = false;
            } else {
              columns.append(",");
            }
            columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false, datasetName));
          }
          columns.append(">");
        }
        break;
      case MAP:
        columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
        columns.append("string,")
            .append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false, datasetName));
        columns.append(">");
        break;
      case ARRAY:
        columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
        columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false, datasetName));
        columns.append(">");
        break;
      case NULL:
        break;
      case BYTES:
      case DOUBLE:
      case ENUM:
      case FIXED:
      case FLOAT:
      case INT:
      case LONG:
      case STRING:
      case BOOLEAN:
        // Handling Avro Logical Types which should always sit in leaf-level.
        boolean isLogicalTypeSet = false;
        try {
          String hiveSpecificLogicalType = generateHiveSpecificLogicalType(schema);
          if (StringUtils.isNoneEmpty(hiveSpecificLogicalType)) {
            isLogicalTypeSet = true;
            columns.append(hiveSpecificLogicalType);
            break;
          }
        } catch (AvroSerdeException ae) {
          log.error("Failed to generate logical type string for field" + schema.getName() + " due to:", ae);
        }

        LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
        if (logicalType != null) {
          switch (logicalType.getName().toLowerCase()) {
            case HiveAvroTypeConstants.DATE:
              LogicalTypes.Date dateType = (LogicalTypes.Date) logicalType;
              dateType.validate(schema);
              columns.append("date");
              isLogicalTypeSet = true;
              break;
            case HiveAvroTypeConstants.DECIMAL:
              LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
              decimalType.validate(schema);
              columns.append(String.format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale()));
              isLogicalTypeSet = true;
              break;
            case HiveAvroTypeConstants.TIME_MILLIS:
              LogicalTypes.TimeMillis timeMillsType = (LogicalTypes.TimeMillis) logicalType;
              timeMillsType.validate(schema);
              columns.append("timestamp");
              isLogicalTypeSet = true;
              break;
            default:
              log.error("Unsupported logical type" + schema.getLogicalType().getName() + ", fallback to physical type");
          }
        }

        if (!isLogicalTypeSet) {
          columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
        }
        break;
      default:
        String exceptionMessage =
            String.format("DDL query generation failed for \"%s\" of dataset %s", schema, datasetName);
        log.error(exceptionMessage);
        throw new AvroRuntimeException(exceptionMessage);
    }

    return columns.toString();
  }