def _single_io_write()

in src/aws_encryption_sdk_cli/internal/io_handling.py [0:0]


    def _single_io_write(self, stream_args, source, destination_writer):
        # type: (STREAM_KWARGS, IO, IO) -> OperationResult
        """Performs the actual write operations for a single operation.

        :param dict stream_args: kwargs to pass to `aws_encryption_sdk.stream`
        :param source: source to write
        :type source: file-like object
        :param destination_writer: destination object to which to write
        :type destination_writer: file-like object
        :returns: OperationResult stating whether the file was written
        :rtype: aws_encryption_sdk_cli.internal.identifiers.OperationResult
        """
        with _encoder(source, self.decode_input) as _source, _encoder(
            destination_writer, self.encode_output
        ) as _destination:  # noqa pylint: disable=line-too-long
            with self.client.stream(source=_source, **stream_args) as handler, self.metadata_writer as metadata:
                metadata_kwargs = dict(
                    mode=stream_args["mode"],
                    input=source.name,
                    output=destination_writer.name,
                    header=json_ready_header(handler.header),
                )
                try:
                    header_auth = handler.header_auth
                except AttributeError:
                    # EncryptStream doesn't expose the header auth at this time
                    pass
                else:
                    metadata_kwargs["header_auth"] = json_ready_header_auth(header_auth)

                if _is_decrypt_mode(str(stream_args["mode"])):
                    discovered_ec = handler.header.encryption_context
                    missing_keys = set(self.required_encryption_context_keys).difference(set(discovered_ec.keys()))
                    missing_pairs = set(self.required_encryption_context.items()).difference(set(discovered_ec.items()))
                    if missing_keys or missing_pairs:
                        _LOGGER.warning(
                            "Skipping decrypt because discovered encryption context did not match required elements."
                        )
                        metadata_kwargs.update(
                            dict(
                                skipped=True,
                                reason="Missing encryption context key or value",
                                missing_encryption_context_keys=list(missing_keys),
                                missing_encryption_context_pairs=list(missing_pairs),
                            )
                        )
                        metadata.write_metadata(**metadata_kwargs)
                        return OperationResult.FAILED_VALIDATION

                metadata.write_metadata(**metadata_kwargs)
                if self.buffer_output:
                    _destination.write(handler.read())
                else:
                    for chunk in handler:
                        _destination.write(chunk)
                        _destination.flush()
        return OperationResult.SUCCESS