in bugbug/bug_snapshot.py [0:0]
def rollback(bug, when=None, do_assert=False):
def assert_or_log(msg):
msg = f"{msg}, in bug {bug['id']}"
if do_assert:
assert False, msg
else:
logger.error(msg)
def parse_flag_change(change):
parts = change.split("(")
if len(parts) != 1 and len(parts) != 2:
assert_or_log(f"Too many parts for {change}")
return None, None, None
name_and_status = parts[0]
name = name_and_status[:-1]
status = name_and_status[-1]
if status not in ["?", "+", "-"]:
assert_or_log(f"unexpected status: {status}")
return None, None, None
requestee = None if len(parts) != 2 else parts[1][:-1]
return name, status, requestee
last_product = bug["product"]
change_to_return = None
if when is not None:
for history in bug["history"]:
for change in history["changes"]:
if when(change):
change_to_return = change
rollback_date = dateutil.parser.parse(history["when"])
break
if change_to_return is not None:
break
if change_to_return is None:
return bug
else:
rollback_date = dateutil.parser.parse(bug["creation_time"])
ret = False
for history in reversed(bug["history"]):
# TODO: Handle changes to product and component.
# TODO: This code might be removed when https://bugzilla.mozilla.org/show_bug.cgi?id=1513952 is fixed.
if ret:
break
for change in history["changes"]:
if change is change_to_return:
ret = True
break
field = change["field_name"]
if field in "component":
# TODO: Ignore this for now, not so easy to make it work https://bugzilla.mozilla.org/show_bug.cgi?id=1513952.
continue
if field == "qa_contact":
# TODO: Ignore this for now. Example usage in 92144.
continue
if field == "cf_fx_iteration":
# TODO: Ignore this for now. Example usage in 1101478.
continue
if field == "cf_crash_signature":
# TODO: Ignore this for now. Example usage in 1437575.
continue
if field == "cf_backlog":
# TODO: Ignore this for now. Example usage in 1048455.
continue
if field == "bug_mentor":
# TODO: Ignore this for now. Example usage in 1042103.
continue
if field == "cf_user_story":
# TODO: Ignore this for now. Example usage in 1369255.
# Seems to be broken in Bugzilla.
continue
if field == "cf_rank":
# TODO: Ignore this for now. Example usage in 1475099.
continue
if field in ["alias", "restrict_comments"]:
continue
if field == "longdescs.isprivate":
# Ignore for now.
continue
if field == "version":
# TODO: Ignore this for now. Example usage in 1162372 or 1389926.
continue
if "attachment_id" in change and field.startswith("attachments"):
# TODO: Ignore changes to attachments for now.
continue
if field == "flagtypes.name":
if "attachment_id" in change:
# https://bugzilla.mozilla.org/show_bug.cgi?id=1516172
if bug["id"] == 1_421_395:
continue
obj = None
for attachment in bug["attachments"]:
if attachment["id"] == change["attachment_id"]:
obj = attachment
break
if obj is None:
assert_or_log(f"Attachment {change['attachment_id']} not found")
continue
else:
obj = bug
if change["added"]:
for to_remove in change["added"].split(", "):
# TODO: Skip needinfo/reviews for now, we need a way to match them precisely when there are multiple needinfos/reviews requested.
is_question_flag = any(
to_remove.startswith(s)
for s in [
"needinfo",
"review",
"feedback",
"ui-review",
"sec-approval",
"sec-review",
"data-review",
"approval-mozilla-",
]
)
name, status, requestee = parse_flag_change(to_remove)
found_flag = None
for f in obj["flags"]:
if (
f["name"] == name
and f["status"] == status
and (
requestee is None
or (
"requestee" in f and f["requestee"] == requestee
)
)
):
if (
found_flag is not None
and not is_expected_inconsistent_change_flag(
to_remove, obj["id"]
)
and not is_question_flag
):
flag_text = "{}{}".format(f["name"], f["status"])
if "requestee" in f:
flag_text = "{}{}".format(
flag_text, f["requestee"]
)
assert_or_log(f"{flag_text} found twice!")
found_flag = f
if found_flag is not None:
obj["flags"].remove(found_flag)
elif (
not is_expected_inconsistent_change_flag(
to_remove, obj["id"]
)
and not is_question_flag
):
assert_or_log(
f"flag {to_remove} not found, in obj {obj['id']}"
)
if change["removed"]:
# Inconsistent review and needinfo flags.
if bug["id"] in [785931, 1_342_178]:
continue
for to_add in change["removed"].split(", "):
name, status, requestee = parse_flag_change(to_add)
new_flag = {"name": name, "status": status}
if requestee is not None:
new_flag["requestee"] = requestee
obj["flags"].append(new_flag)
continue
# We don't support comment tags yet.
if field == "comment_tag":
continue
if field == "comment_revision":
obj = None
for comment in bug["comments"]:
if comment["id"] == change["comment_id"]:
obj = comment
break
if obj is None:
if change["comment_id"] != 14096735:
assert_or_log(f"Comment {change['comment_id']} not found")
continue
if obj["count"] != change["comment_count"]:
assert_or_log("Wrong comment count")
# TODO: It should actually be applied on "raw_text".
# if obj["text"] != change["added"]:
# assert_or_log(f"Current value for comment: ({obj['text']}) is different from previous value: ({change['added']}")
obj["text"] = change["removed"]
continue
if change["added"] != "---":
if field not in bug and not is_expected_inconsistent_field(
field, last_product, bug["id"]
):
assert_or_log(f"{field} is not present")
if field in bug and isinstance(bug[field], list):
if change["added"]:
for to_remove in change["added"].split(", "):
if field in FIELD_TYPES:
try:
to_remove = FIELD_TYPES[field](to_remove)
except Exception:
assert_or_log(
f"Exception while transforming {to_remove} from {bug[field]} (field {field})"
)
if to_remove in bug[field]:
bug[field].remove(to_remove)
elif not is_expected_inconsistent_change_list_field(
field, bug["id"], to_remove
):
assert_or_log(
f"{to_remove} is not in {bug[field]}, for field {field}"
)
if change["removed"]:
for to_add in change["removed"].split(", "):
if field in FIELD_TYPES:
try:
to_add = FIELD_TYPES[field](to_add)
except Exception:
assert_or_log(
f"Exception while transforming {to_add} from {bug[field]} (field {field})"
)
bug[field].append(to_add)
else:
if field in FIELD_TYPES:
try:
old_value = FIELD_TYPES[field](change["removed"])
except Exception:
assert_or_log(
f"Exception while transforming {change['removed']} from {bug[field]} (field {field})"
)
try:
new_value = FIELD_TYPES[field](change["added"])
except Exception:
assert_or_log(
f"Exception while transforming {change['added']} from {bug[field]} (field {field})"
)
else:
old_value = change["removed"]
new_value = change["added"]
if (
field in bug
and bug[field] != new_value
and not is_expected_inconsistent_change_field(
field, bug["id"], new_value, bug[field]
)
):
assert_or_log(
f"Current value for field {field}: ({bug[field]}) is different from previous value: ({new_value})"
)
bug[field] = old_value
if len(bug["comments"]) == 0:
assert_or_log("There must be at least one comment")
bug["comments"] = [
{
"count": 0,
"id": 0,
"text": "",
"author": bug["creator"],
"creation_time": bug["creation_time"],
}
]
# If the first comment is hidden.
if bug["comments"][0]["count"] != 0:
bug["comments"].insert(
0,
{
"id": 0,
"text": "",
"author": bug["creator"],
"creation_time": bug["creation_time"],
},
)
bug["comments"] = [
c
for c in bug["comments"]
if dateutil.parser.parse(c["creation_time"]) - relativedelta(seconds=3)
<= rollback_date
]
bug["attachments"] = [
a
for a in bug["attachments"]
if dateutil.parser.parse(a["creation_time"]) - relativedelta(seconds=3)
<= rollback_date
]
return bug