in mysqloperator/controller/utils.py [0:0]
def merge_patch_object(base: dict, patch: dict, prefix: str = "", key: str = "", none_deletes: bool = False) -> None:
assert not key, "not implemented" # TODO support key
if type(base) != type(patch):
raise ValueError(f"Invalid type in patch at {prefix}")
if type(base) != dict:
raise ValueError(f"Invalid type in base at {prefix}")
def get_named_object(l, name):
for o in l:
assert type(o) == dict, f"{prefix}: {name} = {o}"
if o["name"] == name:
return o
return None
for k, v in patch.items():
ov = base.get(k)
if ov is not None:
if type(ov) == dict:
if type(v) != dict:
# TODO
raise ValueError(f"Invalid type in {prefix}")
else:
merge_patch_object(ov, v, prefix+"."+k, none_deletes=none_deletes)
elif type(ov) == list:
if type(v) != list:
# TODO
raise ValueError(f"Invalid type in {prefix}")
else:
if not ov:
base[k] = v
else:
if type(v[0]) != dict:
base[k] = v
else:
# When merging lists of objects, we matching objects by name
# If there's no matching object, we append
# If there's a matching object, recursively patch
for i, elem in enumerate(v):
if type(elem) != dict:
raise ValueError(
f"Invalid type in {prefix}")
name = elem.get("name")
if not name:
raise ValueError(
"Object in list must have name")
o = get_named_object(ov, name)
if o:
merge_patch_object(
o, elem, prefix+"."+k+"["+str(i)+"]",
none_deletes=none_deletes)
else:
ov.append(elem)
elif type(ov) not in (dict, list) and type(v) in (dict, list):
raise ValueError(f"Invalid type in {prefix}")
else:
if none_deletes and v is None:
del base[k]
else:
base[k] = v
else:
if none_deletes and v is None:
pass
else:
base[k] = v