in supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/ConnectRecordJsonPathExtract.java [51:83]
public List<Variable> parseElementFromConnectRecord(ConnectRecord connectRecord) throws EventBridgeException {
if (extractList == null || extractList.isEmpty()) {
return Lists.newArrayListWithCapacity(0);
}
List<Variable> variableList = Lists.newArrayListWithCapacity(extractList.size());
for (JsonPathElement element : extractList) {
if (JsonPathUtil.isValidAndDefinite(element.getJsonPath())) {
if (element.getJsonPath()
.equals(JSONPATH_PREFIX) || element.getJsonPath()
.equals(JSONPATH_PREFIX_WITH_POINT)) {
variableList.add(new Variable(element.getVariableName(), connectRecord.getData()));
} else if (element.getJsonPath()
.equals(JsonPathUtil.JSONPATH_DATA)) {
variableList.add(new Variable(element.getVariableName(), connectRecord.getData()));
} else if (element.getJsonPath()
.startsWith(JsonPathUtil.JSONPATH_DATA)) {
String StringData = connectRecord.getData() instanceof String ? (String) (connectRecord.getData())
: new Gson().toJson(connectRecord.getData());
variableList.add(new Variable(element.getVariableName(), JsonPathUtil.readJsonPathValue(StringData,
JsonPathUtil.removeDataOfJsonPath(element.getJsonPath()))));
} else {
String key = element.getJsonPath()
.substring(2, element.getJsonPath()
.length());
variableList.add(new Variable(element.getVariableName(), connectRecord.getExtension(key)));
}
} else {
variableList.add(new Variable(element.getVariableName(), element.getJsonPath()));
}
}
return variableList;
}