def encrypt_batch_write_item()

in src/dynamodb_encryption_sdk/internal/utils.py [0:0]


def encrypt_batch_write_item(encrypt_method, crypto_config_method, write_method, **kwargs):
    # type: (Callable, Callable, Callable, **Any) -> Dict
    # narrow this down
    # https://github.com/aws/aws-dynamodb-encryption-python/issues/66
    """Transparently encrypt multiple items before putting them in a batch request.

    :param callable encrypt_method: Method to use to encrypt items
    :param callable crypto_config_method: Method that accepts a table name string and provides a :class:`CryptoConfig`
    :param callable write_method: Method that writes to the table
    :param ``**kwargs``: Keyword arguments to pass to ``write_method``
    :return: DynamoDB response
    :rtype: dict
    """
    request_crypto_config = kwargs.pop("crypto_config", None)
    table_crypto_configs = {}
    plaintext_items = copy.deepcopy(kwargs["RequestItems"])

    for table_name, items in kwargs["RequestItems"].items():
        if request_crypto_config is not None:
            crypto_config = request_crypto_config
        else:
            crypto_config = crypto_config_method(table_name=table_name)
        table_crypto_configs[table_name] = crypto_config

        for pos, value in enumerate(items):
            for request_type, item in value.items():
                # We don't encrypt primary indexes, so we can ignore DeleteItem requests
                if request_type == "PutRequest":
                    items[pos][request_type]["Item"] = encrypt_method(
                        item=item["Item"],
                        crypto_config=crypto_config.with_item(_item_transformer(encrypt_method)(item["Item"])),
                    )

    response = write_method(**kwargs)
    return _process_batch_write_response(plaintext_items, response, table_crypto_configs)