in src/aws_encryption_sdk_cli/internal/io_handling.py [0:0]
def process_single_operation(self, stream_args, source, destination):
# type: (STREAM_KWARGS, SOURCE, str) -> OperationResult
"""Processes a single encrypt/decrypt operation given a pre-loaded source.
:param dict stream_args: kwargs to pass to `aws_encryption_sdk.stream`
:param source: source to write
:type source: str or file-like object
:param str destination: destination identifier
:returns: OperationResult stating whether the file was written
:rtype: aws_encryption_sdk_cli.internal.identifiers.OperationResult
"""
if destination == "-":
destination_writer = _stdout()
else:
if not self._should_write_file(destination):
return OperationResult.SKIPPED
_ensure_dir_exists(destination)
# pylint: disable=consider-using-with
destination_writer = open(os.path.abspath(destination), "wb")
if source == "-":
source = _stdin()
try:
return self._single_io_write(
stream_args=stream_args, source=cast(IO, source), destination_writer=destination_writer
)
finally:
destination_writer.close()