def _get_documents_w_json_path()

in awswrangler/opensearch/_write.py [0:0]


def _get_documents_w_json_path(documents: list[Mapping[str, Any]], json_path: str) -> list[Any]:
    from jsonpath_ng.exceptions import JsonPathParserError

    try:
        jsonpath_expression = jsonpath_ng.parse(json_path)
    except JsonPathParserError as e:
        _logger.error("invalid json_path: %s", json_path)
        raise e
    output_documents = []
    for doc in documents:
        for match in jsonpath_expression.find(doc):
            match_value = match.value
            if isinstance(match_value, list):
                output_documents += match_value
            elif isinstance(match_value, dict):
                output_documents.append(match_value)
            else:
                msg = f"expected json_path value to be a list/dict. received type {type(match_value)} ({match_value})"
                raise ValueError(msg)
    return output_documents