public void convert()

in src/main/java/com/aliyun/odps/kafka/connect/converter/FlattenRecordConverter.java [45:78]


  public void convert(SinkRecord in, Record out) throws JsonProcessingException {
    out.setString(TOPIC, in.topic());
    out.setBigint(PARTITION, in.kafkaPartition().longValue());
    out.setBigint(OFFSET, in.kafkaOffset());
    Map<String, Object> flattenRecord = new HashMap<>();
    switch (mode) {
      case KEY:
        if (in.key() != null) {
          flattenRecord = flattenFieldFromJson(in.key());
        }
        break;
      case VALUE:
        if (in.value() != null) {
          flattenRecord = flattenFieldFromJson(in.value());
        }
        break;
      case DEFAULT:
      default:
        throw new RuntimeException("Unsupported mode for FlattenConverter:" + mode);
    }
    for (String key : flattenRecord.keySet()) {
      checkColumnExist(key);
      try {
        String values = flattenRecord.get(key) instanceof Map ?
                        JsonHandler.getJsonString(flattenRecord.get(key))
                                                              : flattenRecord.get(key).toString();

        ConverterHelper.setRecordByType((ArrayRecord) out, lookID.get(key.toLowerCase()),
                                        values);
      } catch (ParseException e) {
        throw new RuntimeException(e);
      }
    }
  }