public static List sourceTextToTargetObjects()

in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/DataCastingUtils.java [79:190]


  public static List<Object> sourceTextToTargetObjects(
      Row row, Target target, NodeTarget startNodeTarget, NodeTarget endNodeTarget) {
    List<Object> castVals = new ArrayList<>();
    Schema targetSchema = BeamUtils.toBeamSchema(target, startNodeTarget, endNodeTarget);

    List<String> missingFields = new ArrayList<>();

    for (Schema.Field field : targetSchema.getFields()) {
      String fieldName = field.getName();
      Schema.FieldType type = field.getType();
      Object objVal = null;

      try {
        objVal = row.getValue(fieldName);
      } catch (Exception e) {
        LOG.warn("Error getting value for field '{}'", fieldName, e);
      }

      if (objVal == null) {
        missingFields.add(fieldName);
        castVals.add(null);
        continue;
      }

      try {
        TypeName typeName = type.getTypeName();
        switch (typeName) {
          case BYTE:
          case INT16:
          case INT32:
          case INT64:
            castVals.add(asLong(objVal));
            break;
          case DECIMAL:
          case FLOAT:
          case DOUBLE:
            castVals.add(asDouble(objVal));
            break;
          case STRING:
            castVals.add(asString(objVal));
            break;
          case DATETIME:
            castVals.add(asDateTime(objVal, ZonedDateTime::from, OffsetDateTime::from));
            break;
          case BOOLEAN:
            castVals.add(asBoolean(objVal));
            break;
          case BYTES:
            castVals.add(asByteArray(objVal));
            break;
          case ARRAY:
          case ITERABLE:
          case MAP:
          case ROW:
            {
              var message =
                  String.format("Mapping '%s' types from text sources is not supported.", typeName);
              LOG.warn(message);
              castVals.add(null);
              break;
            }
          case LOGICAL_TYPE:
            {
              switch (type.getLogicalType().getIdentifier()) {
                case NanosDuration.IDENTIFIER:
                  castVals.add(asDuration(objVal));
                  break;
                case org.apache.beam.sdk.schemas.logicaltypes.Date.IDENTIFIER:
                  castVals.add(asDate(objVal));
                  break;
                case org.apache.beam.sdk.schemas.logicaltypes.DateTime.IDENTIFIER:
                  castVals.add(asDateTime(objVal, LocalDateTime::from));
                  break;
                case IsoDateTime.IDENTIFIER:
                  castVals.add(asDateTime(objVal));
                  break;
                case Time.IDENTIFIER:
                  castVals.add(asTime(objVal));
                  break;
                default:
                  {
                    var message =
                        String.format(
                            "Mapping '%s' types from text sources is not supported.", typeName);
                    LOG.warn(message);
                    castVals.add(null);
                    break;
                  }
              }

              break;
            }
        }
      } catch (Throwable t) {
        LOG.warn(
            "Invalid value '{}' for type '{}{}'",
            objVal,
            type.getTypeName(),
            type.getTypeName().isLogicalType()
                ? String.format(" (%s)", type.getLogicalType().getIdentifier())
                : "",
            t);
        castVals.add(null);
      }
    }

    if (!missingFields.isEmpty()) {
      LOG.warn("Value for fields {} were not found.", missingFields);
    }

    return castVals;
  }