in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/DataCastingUtils.java [201:319]
static Object fromBeamType(String name, Schema.FieldType type, Object value) {
switch (type.getTypeName()) {
case BYTE:
case INT16:
case INT32:
case INT64:
case STRING:
case FLOAT:
case DOUBLE:
case BOOLEAN:
case BYTES:
{
return value;
}
case DATETIME:
{
ReadableInstant typedValue = castValue(value, ReadableInstant.class);
if (Objects.isNull(typedValue)) {
return null;
}
return java.time.Instant.ofEpochMilli(typedValue.getMillis()).atOffset(ZoneOffset.UTC);
}
case DECIMAL:
{
BigDecimal typedValue = castValue(value, BigDecimal.class);
if (Objects.isNull(typedValue)) {
return null;
}
LOG.warn(
"Type '{}' is not supported in Neo4j, converting field '{}' to Float64 instead.",
type.getTypeName(),
name);
return typedValue.doubleValue();
}
case ARRAY:
case ITERABLE:
{
Collection<?> typedValue = castValue(value, Collection.class);
if (Objects.isNull(typedValue)) {
return null;
}
return typedValue.stream()
.map(element -> fromBeamType(name, type.getCollectionElementType(), element))
.collect(Collectors.toList());
}
case MAP:
{
Schema.FieldType keyType = type.getMapKeyType();
Schema.FieldType valueType = type.getMapValueType();
if (!keyType.getTypeName().isStringType()) {
var message =
String.format(
"Only strings are supported as MAP key values, found '%s' in field '%s'",
keyType.getTypeName(), name);
LOG.error(message);
throw new RuntimeException(message);
}
Map<?, ?> typedValue = castValue(value, Map.class);
if (Objects.isNull(typedValue)) {
return null;
}
Map<String, Object> result = new HashMap<>(typedValue.size());
for (var element : typedValue.entrySet()) {
result.put(
castValue(fromBeamType(name, keyType, element.getKey()), String.class),
fromBeamType(name, valueType, element.getValue()));
}
return result;
}
case ROW:
{
Row typedValue = castValue(value, Row.class);
if (Objects.isNull(typedValue)) {
return null;
}
Map<String, Object> result = new HashMap<>(typedValue.getFieldCount());
for (var field : typedValue.getSchema().getFields()) {
var fieldName = field.getName();
result.put(
fieldName,
fromBeamType(fieldName, field.getType(), typedValue.getValue(fieldName)));
}
return result;
}
case LOGICAL_TYPE:
{
if (Objects.isNull(value)) {
return null;
} else if (value instanceof java.time.Instant) {
return ((java.time.Instant) value).atOffset(ZoneOffset.UTC);
} else if (value instanceof TemporalAccessor) {
return value;
} else if (value instanceof EnumerationType.Value) {
return ((EnumerationType.Value) value).getValue();
} else {
var message =
String.format(
"Field '%s' of type '%s' ('%s') is not supported.",
name, type.getTypeName().name(), type.getLogicalType().getIdentifier());
LOG.error(message);
throw new RuntimeException(message);
}
}
default:
{
var message =
String.format(
"Field '%s' of type '%s' is not supported.", name, type.getTypeName().name());
LOG.error(message);
throw new RuntimeException(message);
}
}
}