in src/aaz_dev/command/controller/workspace_cfg_editor.py [0:0]
def _generate_sub_commands(cls, schema, subresource_idx, update_cmd, ref_args_options=None):
update_op = None
for operation in update_cmd.operations:
if isinstance(operation, CMDInstanceUpdateOperation):
update_op = operation
assert update_op is not None
# find arg
if schema.arg:
arg_var = schema.arg
arg, arg_idx = cls.find_arg_in_command_by_var(update_cmd, arg_var)
if not arg or not arg_idx:
raise exceptions.InvalidAPIUsage(f"Argument '{arg_var}' not exist in command")
else:
# schema is flatten
arg = None
# build ref_args
ref_args = []
update_json = update_op.instance_update.json
update_arg_prefix = f"${update_json.schema.name}"
if update_cmd.arg_groups:
for g in update_cmd.arg_groups:
for a in g.args:
if a.var.startswith(update_arg_prefix):
# ignore arguments linked with body
continue
if 'name' in a.options and isinstance(a, CMDStringArgBase):
# remove auto add 'name', 'n' options
a = a.__class__(raw_data=a.to_native())
a.options = sorted(a.options, key=lambda o: (len(o), o))[-1:] # use the longest argument
ref_args.append(a)
if isinstance(schema, CMDArraySchema):
assert isinstance(arg, CMDArrayArg)
# list command
list_command = cls._build_subresource_list_or_show_command(
update_cmd=update_cmd,
subresource_idx=subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# generate item command
item_subresource_idx = [*subresource_idx, '[]']
item_arg = arg.item
if isinstance(item_arg, CMDClsArgBase):
raise exceptions.InvalidAPIUsage(f"Not support array<class> arg={arg_var}, please unwrap it first.")
if not isinstance(item_arg, CMDObjectArgBase):
raise NotImplementedError()
if item_arg.additional_props:
raise NotImplementedError()
ref_args.extend(item_arg.args)
# create command
create_command = cls._build_subresource_create_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options,
)
# update command
update_command = cls._build_subresource_update_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options,
)
# delete command
delete_command = cls._build_subresource_delete_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options,
)
# show command
show_command = cls._build_subresource_list_or_show_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options,
)
list_command.name = 'list'
create_command.name = 'create'
update_command.name = 'update'
delete_command.name = 'delete'
show_command.name = 'show'
return [list_command, create_command, update_command, delete_command, show_command]
elif isinstance(schema, CMDObjectSchema):
if schema.props and schema.additional_props:
raise NotImplementedError()
if schema.props:
if arg:
assert isinstance(arg, CMDObjectArg)
ref_args.extend(arg.args)
# create command
create_command = cls._build_subresource_create_command(
update_cmd=update_cmd,
subresource_idx=subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# update command
update_command = cls._build_subresource_update_command(
update_cmd=update_cmd,
subresource_idx=subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# delete command
delete_command = cls._build_subresource_delete_command(
update_cmd=update_cmd,
subresource_idx=subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# show command
show_command = cls._build_subresource_list_or_show_command(
update_cmd=update_cmd,
subresource_idx=subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
elif schema.additional_props and schema.additional_props.item:
assert isinstance(arg, CMDObjectArg)
item_subresource_idx = [*subresource_idx, '{}']
item_arg = arg.additional_props.item
if isinstance(item_arg, CMDClsArgBase):
raise exceptions.InvalidAPIUsage(f"Not support dict<class> arg={arg_var}, please unwrap it first.")
if not isinstance(item_arg, CMDObjectSchemaBase):
raise NotImplementedError()
if item_arg.additional_props:
raise NotImplementedError()
ref_args.extend(item_arg.args)
# create command
create_command = cls._build_subresource_create_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# update command
update_command = cls._build_subresource_update_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# delete command
delete_command = cls._build_subresource_delete_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
# show command
show_command = cls._build_subresource_list_or_show_command(
update_cmd=update_cmd,
subresource_idx=item_subresource_idx,
ref_args=ref_args,
ref_options=ref_args_options
)
else:
raise NotImplementedError()
create_command.name = 'create'
update_command.name = 'update'
delete_command.name = 'delete'
show_command.name = 'show'
return [create_command, update_command, delete_command, show_command]
else:
raise NotImplementedError()