static Object fromBeamType()

in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/DataCastingUtils.java [201:319]


  static Object fromBeamType(String name, Schema.FieldType type, Object value) {
    switch (type.getTypeName()) {
      case BYTE:
      case INT16:
      case INT32:
      case INT64:
      case STRING:
      case FLOAT:
      case DOUBLE:
      case BOOLEAN:
      case BYTES:
        {
          return value;
        }
      case DATETIME:
        {
          ReadableInstant typedValue = castValue(value, ReadableInstant.class);
          if (Objects.isNull(typedValue)) {
            return null;
          }

          return java.time.Instant.ofEpochMilli(typedValue.getMillis()).atOffset(ZoneOffset.UTC);
        }
      case DECIMAL:
        {
          BigDecimal typedValue = castValue(value, BigDecimal.class);
          if (Objects.isNull(typedValue)) {
            return null;
          }

          LOG.warn(
              "Type '{}' is not supported in Neo4j, converting field '{}' to Float64 instead.",
              type.getTypeName(),
              name);
          return typedValue.doubleValue();
        }
      case ARRAY:
      case ITERABLE:
        {
          Collection<?> typedValue = castValue(value, Collection.class);
          if (Objects.isNull(typedValue)) {
            return null;
          }

          return typedValue.stream()
              .map(element -> fromBeamType(name, type.getCollectionElementType(), element))
              .collect(Collectors.toList());
        }
      case MAP:
        {
          Schema.FieldType keyType = type.getMapKeyType();
          Schema.FieldType valueType = type.getMapValueType();
          if (!keyType.getTypeName().isStringType()) {
            var message =
                String.format(
                    "Only strings are supported as MAP key values, found '%s' in field '%s'",
                    keyType.getTypeName(), name);
            LOG.error(message);
            throw new RuntimeException(message);
          }

          Map<?, ?> typedValue = castValue(value, Map.class);
          if (Objects.isNull(typedValue)) {
            return null;
          }

          Map<String, Object> result = new HashMap<>(typedValue.size());
          for (var element : typedValue.entrySet()) {
            result.put(
                castValue(fromBeamType(name, keyType, element.getKey()), String.class),
                fromBeamType(name, valueType, element.getValue()));
          }
          return result;
        }
      case ROW:
        {
          Row typedValue = castValue(value, Row.class);
          if (Objects.isNull(typedValue)) {
            return null;
          }

          Map<String, Object> result = new HashMap<>(typedValue.getFieldCount());
          for (var field : typedValue.getSchema().getFields()) {
            var fieldName = field.getName();
            result.put(
                fieldName,
                fromBeamType(fieldName, field.getType(), typedValue.getValue(fieldName)));
          }
          return result;
        }
      case LOGICAL_TYPE:
        {
          if (Objects.isNull(value)) {
            return null;
          } else if (value instanceof java.time.Instant) {
            return ((java.time.Instant) value).atOffset(ZoneOffset.UTC);
          } else if (value instanceof TemporalAccessor) {
            return value;
          } else if (value instanceof EnumerationType.Value) {
            return ((EnumerationType.Value) value).getValue();
          } else {
            var message =
                String.format(
                    "Field '%s' of type '%s' ('%s') is not supported.",
                    name, type.getTypeName().name(), type.getLogicalType().getIdentifier());
            LOG.error(message);
            throw new RuntimeException(message);
          }
        }
      default:
        {
          var message =
              String.format(
                  "Field '%s' of type '%s' is not supported.", name, type.getTypeName().name());
          LOG.error(message);
          throw new RuntimeException(message);
        }
    }
  }