public void convert()

in src/main/java/com/aliyun/odps/kafka/connect/converter/JsonRecordConverter.java [26:45]


  public void convert(SinkRecord in, Record out) throws IOException {
    out.setString(TOPIC, in.topic());
    out.setBigint(PARTITION, in.kafkaPartition().longValue());
    out.setBigint(OFFSET, in.kafkaOffset());
    switch (mode) {
      case KEY:
        if (in.key() != null) {
          out.set(KEY, convertToJson(in.key()));
        }
        break;
      case VALUE:
        if (in.value() != null) {
          out.set(VALUE, convertToJson(in.value()));
        }
        break;
      case DEFAULT:
      default:
        throw new RuntimeException("Unsupported mode for jsonConverter:" + mode);
    }
  }