in src/aaz_dev/swagger/controller/command_generator.py [0:0]
def create_draft_command_group(self, resource,
instance_var,
update_by=None,
methods=('get', 'delete', 'put', 'post', 'head', 'patch'),
**kwargs):
command_group = CMDCommandGroup()
command_group.commands = []
path_item = self.get_path_item(resource)
parameterized_host = self.get_parameterized_host(resource)
if path_item.get is not None and 'get' in methods:
cmd_builder = CMDBuilder(path=resource.path, method='get', mutability=MutabilityEnum.Read,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
show_or_list_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
command_group.commands.append(show_or_list_command)
if path_item.delete is not None and 'delete' in methods:
cmd_builder = CMDBuilder(path=resource.path, method='delete', mutability=MutabilityEnum.Create,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
delete_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
delete_command.confirmation = DEFAULT_CONFIRMATION_PROMPT # add confirmation for delete command by default
command_group.commands.append(delete_command)
if path_item.put is not None and 'put' in methods:
cmd_builder = CMDBuilder(path=resource.path, method='put', mutability=MutabilityEnum.Create,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
create_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
command_group.commands.append(create_command)
if path_item.post is not None and 'post' in methods:
cmd_builder = CMDBuilder(path=resource.path, method='post', mutability=MutabilityEnum.Create,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
action_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
command_group.commands.append(action_command)
if path_item.head is not None and 'head' in methods:
cmd_builder = CMDBuilder(path=resource.path, method='head', mutability=MutabilityEnum.Read,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
head_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
command_group.commands.append(head_command)
# update command
if update_by is None:
update_by_patch_command = None
update_by_generic_command = None
if path_item.patch is not None and 'patch' in methods:
cmd_builder = CMDBuilder(path=resource.path, method='patch', mutability=MutabilityEnum.Update,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
update_by_patch_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
if path_item.get is not None and path_item.put is not None and 'get' in methods and 'put' in methods:
cmd_builder = CMDBuilder(path=resource.path,
parameterized_host=parameterized_host)
get_op = self.generate_operation(
cmd_builder, path_item, instance_var, method='get', mutability=MutabilityEnum.Read)
put_op = self.generate_operation(
cmd_builder, path_item, instance_var, method='put', mutability=MutabilityEnum.Update)
update_by_generic_command = self.generate_generic_update_command(path_item, resource, instance_var, cmd_builder, get_op, put_op)
# generic update command first, patch update command after that
if update_by_generic_command:
command_group.commands.append(update_by_generic_command)
elif update_by_patch_command:
command_group.commands.append(update_by_patch_command)
else:
if update_by == 'GenericOnly':
if path_item.get is None or path_item.put is None:
raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: resource needs to have 'get' and 'put' operations: '{resource}'")
if 'get' not in methods or 'put' not in methods:
raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: '{resource}': 'get' or 'put' not in methods: '{methods}'")
cmd_builder = CMDBuilder(path=resource.path,
parameterized_host=parameterized_host)
get_op = self.generate_operation(
cmd_builder, path_item, instance_var, method='get', mutability=MutabilityEnum.Read)
put_op = self.generate_operation(
cmd_builder, path_item, instance_var, method='put', mutability=MutabilityEnum.Update)
generic_update_command = self.generate_generic_update_command(path_item, resource, instance_var, cmd_builder, get_op, put_op)
if generic_update_command is None:
raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: failed to generate generic update: '{resource}'")
command_group.commands.append(generic_update_command)
# elif 'update_by' in kwargs:
# logger.error(f'Failed to generate generic update for resource: {resource}')
elif update_by == 'PatchOnly':
if path_item.patch is None:
raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: resource needs to have 'patch' operation: '{resource}'")
if 'patch' not in methods:
raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: '{resource}': 'patch' not in methods: '{methods}'")
cmd_builder = CMDBuilder(path=resource.path, method='patch', mutability=MutabilityEnum.Update,
parameterized_host=parameterized_host)
op = self.generate_operation(cmd_builder, path_item, instance_var)
patch_update_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
command_group.commands.append(patch_update_command)
# elif update_by == 'GenericAndPatch':
# # TODO: add support for generic and patch merge
# if path_item.get is None or path_item.put is None or path_item.patch is None:
# raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: resource needs to have 'get' and 'put' and 'patch' operation: '{resource}'")
# if 'get' not in methods or 'put' not in methods or 'patch' not in methods:
# raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: '{resource}': 'get' or 'put' or 'patch' not in methods: '{methods}'")
# cmd_builder = CMDBuilder(path=resource.path,
# parameterized_host=parameterized_host)
# get_op = self.generate_operation(
# cmd_builder, path_item, instance_var, method='get', mutability=MutabilityEnum.Read)
# put_op = self.generate_operation(
# cmd_builder, path_item, instance_var, method='put', mutability=MutabilityEnum.Update)
# generic_update_command = self.generate_generic_update_command(path_item, resource, instance_var, cmd_builder, get_op, put_op)
# if generic_update_command is None:
# raise exceptions.InvalidAPIUsage(f"Invalid update_by resource: failed to generate generic update: '{resource}'")
# cmd_builder = CMDBuilder(path=resource.path, method='patch', mutability=MutabilityEnum.Update,
# parameterized_host=parameterized_host)
# op = self.generate_operation(cmd_builder, path_item, instance_var)
# patch_update_command = self.generate_command(path_item, resource, instance_var, cmd_builder, op)
# generic_and_patch_update_command = self._merge_update_commands(
# patch_command=patch_update_command, generic_command=generic_update_command
# )
# command_group.commands.append(generic_and_patch_update_command)
elif update_by != 'None':
raise exceptions.InvalidAPIUsage(f"Invalid update_by value: {update_by} : only support ['GenericOnly', 'PatchOnly', 'None'] values")
for command in command_group.commands:
parts = command.name.split(' ')
group_name = ' '.join(parts[:-1])
if command_group.name:
assert group_name == command_group.name
else:
command_group.name = group_name
command.name = parts[-1] # remove the command group name parts
self.optimize_command_description(command)
return command_group