in src/aws_encryption_sdk_cli/internal/io_handling.py [0:0]
def _single_io_write(self, stream_args, source, destination_writer):
# type: (STREAM_KWARGS, IO, IO) -> OperationResult
"""Performs the actual write operations for a single operation.
:param dict stream_args: kwargs to pass to `aws_encryption_sdk.stream`
:param source: source to write
:type source: file-like object
:param destination_writer: destination object to which to write
:type destination_writer: file-like object
:returns: OperationResult stating whether the file was written
:rtype: aws_encryption_sdk_cli.internal.identifiers.OperationResult
"""
with _encoder(source, self.decode_input) as _source, _encoder(
destination_writer, self.encode_output
) as _destination: # noqa pylint: disable=line-too-long
with self.client.stream(source=_source, **stream_args) as handler, self.metadata_writer as metadata:
metadata_kwargs = dict(
mode=stream_args["mode"],
input=source.name,
output=destination_writer.name,
header=json_ready_header(handler.header),
)
try:
header_auth = handler.header_auth
except AttributeError:
# EncryptStream doesn't expose the header auth at this time
pass
else:
metadata_kwargs["header_auth"] = json_ready_header_auth(header_auth)
if _is_decrypt_mode(str(stream_args["mode"])):
discovered_ec = handler.header.encryption_context
missing_keys = set(self.required_encryption_context_keys).difference(set(discovered_ec.keys()))
missing_pairs = set(self.required_encryption_context.items()).difference(set(discovered_ec.items()))
if missing_keys or missing_pairs:
_LOGGER.warning(
"Skipping decrypt because discovered encryption context did not match required elements."
)
metadata_kwargs.update(
dict(
skipped=True,
reason="Missing encryption context key or value",
missing_encryption_context_keys=list(missing_keys),
missing_encryption_context_pairs=list(missing_pairs),
)
)
metadata.write_metadata(**metadata_kwargs)
return OperationResult.FAILED_VALIDATION
metadata.write_metadata(**metadata_kwargs)
if self.buffer_output:
_destination.write(handler.read())
else:
for chunk in handler:
_destination.write(chunk)
_destination.flush()
return OperationResult.SUCCESS