in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamUtils.java [40:148]
public static Schema toBeamSchema(
Target target, NodeTarget startNodeTarget, NodeTarget endNodeTarget) {
TargetType targetType = target.getTargetType();
if (targetType == TargetType.QUERY) {
return new Schema(new ArrayList<>());
}
if (targetType != TargetType.NODE && targetType != TargetType.RELATIONSHIP) {
throw new IllegalArgumentException(
String.format("Expected relationship or node target, got %s", targetType));
}
EntityTarget entityTarget = (EntityTarget) target;
List<Schema.Field> fields = new ArrayList<>();
for (PropertyMapping mapping :
ModelUtils.allPropertyMappings(entityTarget, startNodeTarget, endNodeTarget)) {
// map source column names to order
String field = mapping.getSourceField();
if (StringUtils.isEmpty(field)) {
throw new RuntimeException(
"Could not find field name or constant in target: " + target.getName());
}
Schema.Field schemaField;
PropertyType propertyType = mapping.getTargetPropertyType();
if (propertyType == null) {
fields.add(defaultFieldSchema(field));
continue;
}
switch (propertyType) {
case BOOLEAN:
schemaField = Schema.Field.nullable(field, FieldType.BOOLEAN);
break;
case BOOLEAN_ARRAY:
schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.BOOLEAN));
break;
case BYTE_ARRAY:
schemaField = Schema.Field.nullable(field, FieldType.BYTES);
break;
case DATE:
schemaField = Schema.Field.nullable(field, Schema.FieldType.logicalType(new Date()));
break;
case DATE_ARRAY:
schemaField =
Schema.Field.nullable(
field, Schema.FieldType.array(Schema.FieldType.logicalType(new Date())));
break;
case DURATION:
schemaField =
Schema.Field.nullable(field, Schema.FieldType.logicalType(new NanosDuration()));
break;
case DURATION_ARRAY:
schemaField =
Schema.Field.nullable(
field, Schema.FieldType.array(Schema.FieldType.logicalType(new NanosDuration())));
break;
case FLOAT:
schemaField = Schema.Field.nullable(field, FieldType.DOUBLE);
break;
case FLOAT_ARRAY:
schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.DOUBLE));
break;
case INTEGER:
schemaField = Schema.Field.nullable(field, Schema.FieldType.INT64);
break;
case INTEGER_ARRAY:
schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.INT64));
break;
case LOCAL_DATETIME:
schemaField = Schema.Field.nullable(field, Schema.FieldType.logicalType(new DateTime()));
break;
case LOCAL_DATETIME_ARRAY:
schemaField =
Schema.Field.nullable(
field, Schema.FieldType.array(Schema.FieldType.logicalType(new DateTime())));
break;
case LOCAL_TIME:
case ZONED_TIME:
schemaField = Schema.Field.nullable(field, Schema.FieldType.logicalType(new Time()));
break;
case LOCAL_TIME_ARRAY:
case ZONED_TIME_ARRAY:
schemaField =
Schema.Field.nullable(
field, Schema.FieldType.array(Schema.FieldType.logicalType(new Time())));
break;
case POINT:
case STRING:
schemaField = Schema.Field.nullable(field, FieldType.STRING);
break;
case POINT_ARRAY:
case STRING_ARRAY:
schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.STRING));
break;
case ZONED_DATETIME:
schemaField =
Schema.Field.nullable(field, Schema.FieldType.logicalType(new IsoDateTime()));
break;
case ZONED_DATETIME_ARRAY:
schemaField =
Schema.Field.nullable(
field, Schema.FieldType.array(Schema.FieldType.logicalType(new IsoDateTime())));
break;
default:
throw new IllegalArgumentException(
String.format("Unsupported property type: %s", propertyType));
}
fields.add(schemaField);
}
return new Schema(fields);
}