in src/dynamodb_encryption_sdk/material_providers/store/meta.py [0:0]
def _save_materials(self, material_name, version, encryption_key, signing_key):
# type: (Text, int, JceNameLocalDelegatedKey, JceNameLocalDelegatedKey) -> None
"""Save materials to the table, raising an error if the version already exists.
:param str material_name: Material to locate
:param int version: Version of material to locate
:raises VersionAlreadyExistsError: if the specified version already exists
"""
_LOGGER.debug('Saving material "%s" version %d to MetaStore table', material_name, version)
item = {
MetaStoreAttributeNames.PARTITION.value: material_name,
MetaStoreAttributeNames.SORT.value: version,
MetaStoreAttributeNames.MATERIAL_TYPE_VERSION.value: MetaStoreValues.MATERIAL_TYPE_VERSION.value,
MetaStoreAttributeNames.ENCRYPTION_ALGORITHM.value: encryption_key.algorithm,
MetaStoreAttributeNames.ENCRYPTION_KEY.value: Binary(encryption_key.key),
MetaStoreAttributeNames.INTEGRITY_ALGORITHM.value: signing_key.algorithm,
MetaStoreAttributeNames.INTEGRITY_KEY.value: Binary(signing_key.key),
}
try:
self._encrypted_table.put_item(
Item=item,
ConditionExpression=(
Attr(MetaStoreAttributeNames.PARTITION.value).not_exists()
& Attr(MetaStoreAttributeNames.SORT.value).not_exists()
),
)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] == "ConditionalCheckFailedException":
raise VersionAlreadyExistsError('Version already exists: "{}#{}"'.format(material_name, version))