in src/aws_encryption_sdk_cli/__init__.py [0:0]
def process_cli_request(stream_args, parsed_args): # noqa: C901
# type: (STREAM_KWARGS, Namespace) -> None
"""Maps the operation request to the appropriate function based on the type of input and output provided.
:param dict stream_args: kwargs to pass to `aws_encryption_sdk.stream`
:param args: Parsed arguments from argparse
:type args: argparse.Namespace
"""
_catch_bad_destination_requests(parsed_args.output)
_catch_bad_metadata_file_requests(
metadata_output=parsed_args.metadata_output, source=parsed_args.input, destination=parsed_args.output
)
_catch_bad_stdin_stdout_requests(parsed_args.input, parsed_args.output)
if not parsed_args.commitment_policy:
commitment_policy = CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
elif parsed_args.commitment_policy == CommitmentPolicyArgs.FORBID_ENCRYPT_ALLOW_DECRYPT:
commitment_policy = CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT
elif parsed_args.commitment_policy == CommitmentPolicyArgs.REQUIRE_ENCRYPT_ALLOW_DECRYPT:
commitment_policy = CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT
elif parsed_args.commitment_policy == CommitmentPolicyArgs.REQUIRE_ENCRYPT_REQUIRE_DECRYPT:
commitment_policy = CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
else:
raise BadUserArgumentError("Invalid commitment policy.")
handler = IOHandler(
metadata_writer=parsed_args.metadata_output,
interactive=parsed_args.interactive,
no_overwrite=parsed_args.no_overwrite,
decode_input=parsed_args.decode,
encode_output=parsed_args.encode,
required_encryption_context=parsed_args.encryption_context,
required_encryption_context_keys=parsed_args.required_encryption_context_keys,
commitment_policy=commitment_policy,
buffer_output=parsed_args.buffer,
max_encrypted_data_keys=parsed_args.max_encrypted_data_keys,
)
if parsed_args.input == "-":
# read from stdin
handler.process_single_operation(
stream_args=stream_args, source=parsed_args.input, destination=parsed_args.output
)
return
expanded_sources = _expand_sources(parsed_args.input)
_catch_bad_file_and_directory_requests(expanded_sources, parsed_args.output)
for _source in expanded_sources:
_destination = copy.copy(parsed_args.output)
if os.path.isdir(_source):
if not parsed_args.recursive:
_LOGGER.warning("Skipping %s because it is a directory and -r/-R/--recursive is not set", _source)
continue
handler.process_dir(
stream_args=stream_args, source=_source, destination=_destination, suffix=parsed_args.suffix
)
elif os.path.isfile(_source):
if os.path.isdir(parsed_args.output):
# create new filename
_destination = output_filename(
source_filename=_source,
destination_dir=_destination,
mode=str(stream_args["mode"]),
suffix=parsed_args.suffix,
)
# write to file
handler.process_single_file(stream_args=stream_args, source=_source, destination=_destination)