def process_single_file()

in src/aws_encryption_sdk_cli/internal/io_handling.py [0:0]


    def process_single_file(self, stream_args, source, destination):
        # type: (STREAM_KWARGS, str, str) -> None
        """Processes a single encrypt/decrypt operation on a source file.

        :param dict stream_args: kwargs to pass to `aws_encryption_sdk.stream`
        :param str source: Full file path to source file
        :param str destination: Full file path to destination file
        """
        if os.path.realpath(source) == os.path.realpath(destination):
            # File source, directory destination, empty suffix:
            _LOGGER.warning("Skipping because the source (%s) and destination (%s) are the same", source, destination)
            return

        _LOGGER.info("%sing file %s to %s", stream_args["mode"], source, destination)

        _stream_args = copy.copy(stream_args)
        # Because we can actually know size for files and Base64IO does not support seeking,
        # set the source length manually for files. This allows enables data key caching when
        # Base64-decoding a source file.
        source_file_size = os.path.getsize(source)
        if self.decode_input and not self.encode_output:
            _stream_args["source_length"] = int(source_file_size * (3 / 4))
        else:
            _stream_args["source_length"] = source_file_size

        try:
            with open(os.path.abspath(source), "rb") as source_reader:
                operation_result = self.process_single_operation(
                    stream_args=_stream_args, source=source_reader, destination=destination
                )
        except Exception:  # pylint: disable=broad-except
            operation_result = OperationResult.FAILED
            raise
        finally:
            if operation_result.needs_cleanup and destination != "-":  # pylint: disable=no-member
                _LOGGER.warning("Operation failed: deleting output file: %s", destination)
                try:
                    os.remove(destination)
                except OSError:
                    # if the file doesn't exist that's ok too
                    pass