public static Schema toBeamSchema()

in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamUtils.java [40:148]


  public static Schema toBeamSchema(
      Target target, NodeTarget startNodeTarget, NodeTarget endNodeTarget) {
    TargetType targetType = target.getTargetType();
    if (targetType == TargetType.QUERY) {
      return new Schema(new ArrayList<>());
    }
    if (targetType != TargetType.NODE && targetType != TargetType.RELATIONSHIP) {
      throw new IllegalArgumentException(
          String.format("Expected relationship or node target, got %s", targetType));
    }
    EntityTarget entityTarget = (EntityTarget) target;

    List<Schema.Field> fields = new ArrayList<>();
    for (PropertyMapping mapping :
        ModelUtils.allPropertyMappings(entityTarget, startNodeTarget, endNodeTarget)) {
      // map source column names to order
      String field = mapping.getSourceField();
      if (StringUtils.isEmpty(field)) {
        throw new RuntimeException(
            "Could not find field name or constant in target: " + target.getName());
      }
      Schema.Field schemaField;
      PropertyType propertyType = mapping.getTargetPropertyType();
      if (propertyType == null) {
        fields.add(defaultFieldSchema(field));
        continue;
      }
      switch (propertyType) {
        case BOOLEAN:
          schemaField = Schema.Field.nullable(field, FieldType.BOOLEAN);
          break;
        case BOOLEAN_ARRAY:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.BOOLEAN));
          break;
        case BYTE_ARRAY:
          schemaField = Schema.Field.nullable(field, FieldType.BYTES);
          break;
        case DATE:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.logicalType(new Date()));
          break;
        case DATE_ARRAY:
          schemaField =
              Schema.Field.nullable(
                  field, Schema.FieldType.array(Schema.FieldType.logicalType(new Date())));
          break;
        case DURATION:
          schemaField =
              Schema.Field.nullable(field, Schema.FieldType.logicalType(new NanosDuration()));
          break;
        case DURATION_ARRAY:
          schemaField =
              Schema.Field.nullable(
                  field, Schema.FieldType.array(Schema.FieldType.logicalType(new NanosDuration())));
          break;
        case FLOAT:
          schemaField = Schema.Field.nullable(field, FieldType.DOUBLE);
          break;
        case FLOAT_ARRAY:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.DOUBLE));
          break;
        case INTEGER:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.INT64);
          break;
        case INTEGER_ARRAY:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.INT64));
          break;
        case LOCAL_DATETIME:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.logicalType(new DateTime()));
          break;
        case LOCAL_DATETIME_ARRAY:
          schemaField =
              Schema.Field.nullable(
                  field, Schema.FieldType.array(Schema.FieldType.logicalType(new DateTime())));
          break;
        case LOCAL_TIME:
        case ZONED_TIME:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.logicalType(new Time()));
          break;
        case LOCAL_TIME_ARRAY:
        case ZONED_TIME_ARRAY:
          schemaField =
              Schema.Field.nullable(
                  field, Schema.FieldType.array(Schema.FieldType.logicalType(new Time())));
          break;
        case POINT:
        case STRING:
          schemaField = Schema.Field.nullable(field, FieldType.STRING);
          break;
        case POINT_ARRAY:
        case STRING_ARRAY:
          schemaField = Schema.Field.nullable(field, Schema.FieldType.array(FieldType.STRING));
          break;
        case ZONED_DATETIME:
          schemaField =
              Schema.Field.nullable(field, Schema.FieldType.logicalType(new IsoDateTime()));
          break;
        case ZONED_DATETIME_ARRAY:
          schemaField =
              Schema.Field.nullable(
                  field, Schema.FieldType.array(Schema.FieldType.logicalType(new IsoDateTime())));
          break;
        default:
          throw new IllegalArgumentException(
              String.format("Unsupported property type: %s", propertyType));
      }
      fields.add(schemaField);
    }
    return new Schema(fields);
  }