public String getDetailType()

in src/main/java/software/amazon/event/kafkaconnector/mapping/JsonPathDetailTypeMapper.java [41:108]


  public String getDetailType(SinkRecord record) {
    if (record == null) {
      throw new IllegalArgumentException("SinkRecord is null. Unable to extract detail type.");
    }

    if (record.topic() == null || record.topic().trim().isEmpty()) {
      throw new IllegalArgumentException(
          "SinkRecord topic is null or empty but is required for fallback logic.");
    }

    var topic = record.topic();
    try {
      var jsonBytes = jsonConverter.fromConnectData(topic, record.valueSchema(), record.value());
      // super defensive, because null here should never be the case
      var jsonString = new String(jsonBytes).trim();
      if (jsonBytes == null || jsonString.isEmpty()) {
        log.error(
            "Record value conversion to JSON bytes returned null or empty string for record '{}', using topic '{}' as"
                + " fallback",
            record,
            topic);
        return topic;
      }

      var extractedValue = JsonPath.using(jsonPathConfiguration).parse(jsonString).read(jsonPath);

      if (extractedValue == null) {
        log.warn(
            "Parsed JSON value is null for JSON path '{}' and record '{}', using topic '{}' as fallback",
            jsonPath,
            record,
            topic);
        return topic;
      }

      if (!(extractedValue instanceof String)) {
        log.warn(
            "Parsed JSON value is not of type String for for JSON path '{}' and record '{}', using topic '{}' as fallback",
            jsonPath,
            record,
            topic);
        return topic;
      }

      if (((String) extractedValue).trim().isEmpty()) {
        log.warn(
            "Parsed JSON value is empty String for JSON path '{}' and record '{}', using topic '{}' as fallback",
            jsonPath,
            record,
            topic);
        return topic;
      }

      log.trace(
          "Successfully extracted detail type '{}' for JSON path '{}' and record '{}'",
          extractedValue,
          jsonPath,
          record);
      return (String) extractedValue;
    } catch (Exception e) {
      log.error(
          "Could not extract JSON value for JSON path '{}' and record '{}', using topic '{}' as fallback",
          jsonPath,
          record,
          topic);
      return topic;
    }
  }