public static void setRecordByType()

in src/main/java/com/aliyun/odps/kafka/connect/utils/ConverterHelper.java [16:78]


  public static void setRecordByType(ArrayRecord rec, int idx, String value) throws ParseException {
    if (value == null) {
      rec.set(idx, null);
      return;
    }
    switch (rec.getColumns()[idx].getTypeInfo().getOdpsType()) {
      case STRING:
        rec.setString(idx, value);
        break;
      case BIGINT:
        rec.setBigint(idx, Long.valueOf(value));
        break;
      case DOUBLE:
        switch (value) {
          case "nan":
            rec.setDouble(idx, Double.NaN);
            break;
          case "inf":
            rec.setDouble(idx, Double.POSITIVE_INFINITY);
            break;
          case "-inf":
            rec.setDouble(idx, Double.NEGATIVE_INFINITY);
            break;
          default:
            rec.setDouble(idx, Double.valueOf(value));
            break;
        }
        break;
      case DATE:
        // TODO add Time zone config, now use system default
        rec.setDateAsLocalDate(idx, LocalDate.parse(value));
        break;
      case TIMESTAMP:
        // TODO add Time zone config, now use system default
        DateTimeFormatter
            TsFormatter =
            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSS")
                .withZone(ZoneId.systemDefault());
        Instant instant = ZonedDateTime.parse(value, TsFormatter).toInstant();
        rec.setTimestampAsInstant(idx, instant);
        break;
      case BOOLEAN:
        rec.setBoolean(idx, Boolean.valueOf(value));
        break;
      case DATETIME:
        // TODO add Time zone config, now use system default
        DateTimeFormatter
            DtFormatter =
            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
        ZonedDateTime zonedDateTime = ZonedDateTime.parse(value, DtFormatter);
        rec.setDatetimeAsZonedDateTime(idx, zonedDateTime);
        break;
      case DECIMAL:
        rec.setDecimal(idx, new BigDecimal(value));
        break;
      case JSON:
        rec.setJsonValue(idx, new SimpleJsonValue(value));
        break;
      default:
        throw new RuntimeException("Unsupported type " +
                                   rec.getColumns()[idx].getTypeInfo().getOdpsType());
    }
  }