in awswrangler/opensearch/_write.py [0:0]
def _get_documents_w_json_path(documents: list[Mapping[str, Any]], json_path: str) -> list[Any]:
from jsonpath_ng.exceptions import JsonPathParserError
try:
jsonpath_expression = jsonpath_ng.parse(json_path)
except JsonPathParserError as e:
_logger.error("invalid json_path: %s", json_path)
raise e
output_documents = []
for doc in documents:
for match in jsonpath_expression.find(doc):
match_value = match.value
if isinstance(match_value, list):
output_documents += match_value
elif isinstance(match_value, dict):
output_documents.append(match_value)
else:
msg = f"expected json_path value to be a list/dict. received type {type(match_value)} ({match_value})"
raise ValueError(msg)
return output_documents