in src/aaz_dev/command/controller/cfg_reader.py [0:0]
def iter_schema_cls_reference_in_operations(cls, operations, cls_name):
assert isinstance(cls_name, str) and not cls_name.startswith('@')
cls_type_name = f"@{cls_name}"
def schema_filter(_parent, _schema, _schema_idx):
if _schema.type == cls_type_name:
# find match
return (_parent, _schema, _schema_idx), False
return None, False
for op in operations:
if isinstance(op, CMDHttpOperation):
if op.http.request:
for parent, schema, schema_idx in cls._iter_schema_in_request(op.http.request, schema_filter=schema_filter):
if schema:
schema_idx = [_SchemaIdxEnum.Http, _SchemaIdxEnum.Request, *schema_idx]
yield parent, schema, schema_idx
if op.http.responses:
for response in op.http.responses:
if response.is_error:
continue
schema_idx_prefix = [_SchemaIdxEnum.Http, '_'.join([_SchemaIdxEnum.Response, *[str(code) for code in response.status_codes]])]
for parent, schema, schema_idx in cls._iter_schema_in_response(response, schema_filter=schema_filter):
if schema:
schema_idx = [*schema_idx_prefix, *schema_idx]
yield parent, schema, schema_idx
if isinstance(op, CMDInstanceUpdateOperation):
if isinstance(op.instance_update, CMDJsonInstanceUpdateAction):
for parent, schema, schema_idx in cls._iter_schema_in_json(op.instance_update.json, schema_filter=schema_filter):
if schema:
schema_idx = [_SchemaIdxEnum.Instance, _SchemaIdxEnum.Update, *schema_idx]
yield parent, schema, schema_idx
if isinstance(op, CMDInstanceCreateOperation):
if isinstance(op.instance_create, CMDJsonInstanceCreateAction):
for parent, schema, schema_idx in cls._iter_schema_in_json(op.instance_create.json, schema_filter=schema_filter):
if schema:
schema_idx = [_SchemaIdxEnum.Instance, _SchemaIdxEnum.Create, *schema_idx]
yield parent, schema, schema_idx