in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/DataCastingUtils.java [79:190]
public static List<Object> sourceTextToTargetObjects(
Row row, Target target, NodeTarget startNodeTarget, NodeTarget endNodeTarget) {
List<Object> castVals = new ArrayList<>();
Schema targetSchema = BeamUtils.toBeamSchema(target, startNodeTarget, endNodeTarget);
List<String> missingFields = new ArrayList<>();
for (Schema.Field field : targetSchema.getFields()) {
String fieldName = field.getName();
Schema.FieldType type = field.getType();
Object objVal = null;
try {
objVal = row.getValue(fieldName);
} catch (Exception e) {
LOG.warn("Error getting value for field '{}'", fieldName, e);
}
if (objVal == null) {
missingFields.add(fieldName);
castVals.add(null);
continue;
}
try {
TypeName typeName = type.getTypeName();
switch (typeName) {
case BYTE:
case INT16:
case INT32:
case INT64:
castVals.add(asLong(objVal));
break;
case DECIMAL:
case FLOAT:
case DOUBLE:
castVals.add(asDouble(objVal));
break;
case STRING:
castVals.add(asString(objVal));
break;
case DATETIME:
castVals.add(asDateTime(objVal, ZonedDateTime::from, OffsetDateTime::from));
break;
case BOOLEAN:
castVals.add(asBoolean(objVal));
break;
case BYTES:
castVals.add(asByteArray(objVal));
break;
case ARRAY:
case ITERABLE:
case MAP:
case ROW:
{
var message =
String.format("Mapping '%s' types from text sources is not supported.", typeName);
LOG.warn(message);
castVals.add(null);
break;
}
case LOGICAL_TYPE:
{
switch (type.getLogicalType().getIdentifier()) {
case NanosDuration.IDENTIFIER:
castVals.add(asDuration(objVal));
break;
case org.apache.beam.sdk.schemas.logicaltypes.Date.IDENTIFIER:
castVals.add(asDate(objVal));
break;
case org.apache.beam.sdk.schemas.logicaltypes.DateTime.IDENTIFIER:
castVals.add(asDateTime(objVal, LocalDateTime::from));
break;
case IsoDateTime.IDENTIFIER:
castVals.add(asDateTime(objVal));
break;
case Time.IDENTIFIER:
castVals.add(asTime(objVal));
break;
default:
{
var message =
String.format(
"Mapping '%s' types from text sources is not supported.", typeName);
LOG.warn(message);
castVals.add(null);
break;
}
}
break;
}
}
} catch (Throwable t) {
LOG.warn(
"Invalid value '{}' for type '{}{}'",
objVal,
type.getTypeName(),
type.getTypeName().isLogicalType()
? String.format(" (%s)", type.getLogicalType().getIdentifier())
: "",
t);
castVals.add(null);
}
}
if (!missingFields.isEmpty()) {
LOG.warn("Value for fields {} were not found.", missingFields);
}
return castVals;
}