def rollback()

in bugbug/bug_snapshot.py [0:0]


def rollback(bug, when=None, do_assert=False):
    def assert_or_log(msg):
        msg = f"{msg}, in bug {bug['id']}"
        if do_assert:
            assert False, msg
        else:
            logger.error(msg)

    def parse_flag_change(change):
        parts = change.split("(")
        if len(parts) != 1 and len(parts) != 2:
            assert_or_log(f"Too many parts for {change}")
            return None, None, None

        name_and_status = parts[0]
        name = name_and_status[:-1]
        status = name_and_status[-1]
        if status not in ["?", "+", "-"]:
            assert_or_log(f"unexpected status: {status}")
            return None, None, None

        requestee = None if len(parts) != 2 else parts[1][:-1]
        return name, status, requestee

    last_product = bug["product"]

    change_to_return = None
    if when is not None:
        for history in bug["history"]:
            for change in history["changes"]:
                if when(change):
                    change_to_return = change
                    rollback_date = dateutil.parser.parse(history["when"])
                    break

            if change_to_return is not None:
                break

        if change_to_return is None:
            return bug
    else:
        rollback_date = dateutil.parser.parse(bug["creation_time"])

    ret = False

    for history in reversed(bug["history"]):
        # TODO: Handle changes to product and component.
        # TODO: This code might be removed when https://bugzilla.mozilla.org/show_bug.cgi?id=1513952 is fixed.

        if ret:
            break

        for change in history["changes"]:
            if change is change_to_return:
                ret = True
                break

            field = change["field_name"]

            if field in "component":
                # TODO: Ignore this for now, not so easy to make it work https://bugzilla.mozilla.org/show_bug.cgi?id=1513952.
                continue

            if field == "qa_contact":
                # TODO: Ignore this for now. Example usage in 92144.
                continue

            if field == "cf_fx_iteration":
                # TODO: Ignore this for now. Example usage in 1101478.
                continue

            if field == "cf_crash_signature":
                # TODO: Ignore this for now. Example usage in 1437575.
                continue

            if field == "cf_backlog":
                # TODO: Ignore this for now. Example usage in 1048455.
                continue

            if field == "bug_mentor":
                # TODO: Ignore this for now. Example usage in 1042103.
                continue

            if field == "cf_user_story":
                # TODO: Ignore this for now. Example usage in 1369255.
                # Seems to be broken in Bugzilla.
                continue

            if field == "cf_rank":
                # TODO: Ignore this for now. Example usage in 1475099.
                continue

            if field in ["alias", "restrict_comments"]:
                continue

            if field == "longdescs.isprivate":
                # Ignore for now.
                continue

            if field == "version":
                # TODO: Ignore this for now. Example usage in 1162372 or 1389926.
                continue

            if "attachment_id" in change and field.startswith("attachments"):
                # TODO: Ignore changes to attachments for now.
                continue

            if field == "flagtypes.name":
                if "attachment_id" in change:
                    # https://bugzilla.mozilla.org/show_bug.cgi?id=1516172
                    if bug["id"] == 1_421_395:
                        continue

                    obj = None
                    for attachment in bug["attachments"]:
                        if attachment["id"] == change["attachment_id"]:
                            obj = attachment
                            break

                    if obj is None:
                        assert_or_log(f"Attachment {change['attachment_id']} not found")
                        continue
                else:
                    obj = bug

                if change["added"]:
                    for to_remove in change["added"].split(", "):
                        # TODO: Skip needinfo/reviews for now, we need a way to match them precisely when there are multiple needinfos/reviews requested.
                        is_question_flag = any(
                            to_remove.startswith(s)
                            for s in [
                                "needinfo",
                                "review",
                                "feedback",
                                "ui-review",
                                "sec-approval",
                                "sec-review",
                                "data-review",
                                "approval-mozilla-",
                            ]
                        )

                        name, status, requestee = parse_flag_change(to_remove)

                        found_flag = None
                        for f in obj["flags"]:
                            if (
                                f["name"] == name
                                and f["status"] == status
                                and (
                                    requestee is None
                                    or (
                                        "requestee" in f and f["requestee"] == requestee
                                    )
                                )
                            ):
                                if (
                                    found_flag is not None
                                    and not is_expected_inconsistent_change_flag(
                                        to_remove, obj["id"]
                                    )
                                    and not is_question_flag
                                ):
                                    flag_text = "{}{}".format(f["name"], f["status"])
                                    if "requestee" in f:
                                        flag_text = "{}{}".format(
                                            flag_text, f["requestee"]
                                        )
                                    assert_or_log(f"{flag_text} found twice!")
                                found_flag = f

                        if found_flag is not None:
                            obj["flags"].remove(found_flag)
                        elif (
                            not is_expected_inconsistent_change_flag(
                                to_remove, obj["id"]
                            )
                            and not is_question_flag
                        ):
                            assert_or_log(
                                f"flag {to_remove} not found, in obj {obj['id']}"
                            )

                if change["removed"]:
                    # Inconsistent review and needinfo flags.
                    if bug["id"] in [785931, 1_342_178]:
                        continue

                    for to_add in change["removed"].split(", "):
                        name, status, requestee = parse_flag_change(to_add)

                        new_flag = {"name": name, "status": status}
                        if requestee is not None:
                            new_flag["requestee"] = requestee

                        obj["flags"].append(new_flag)

                continue

            # We don't support comment tags yet.
            if field == "comment_tag":
                continue

            if field == "comment_revision":
                obj = None
                for comment in bug["comments"]:
                    if comment["id"] == change["comment_id"]:
                        obj = comment
                        break

                if obj is None:
                    if change["comment_id"] != 14096735:
                        assert_or_log(f"Comment {change['comment_id']} not found")
                    continue

                if obj["count"] != change["comment_count"]:
                    assert_or_log("Wrong comment count")

                # TODO: It should actually be applied on "raw_text".
                # if obj["text"] != change["added"]:
                #     assert_or_log(f"Current value for comment: ({obj['text']}) is different from previous value: ({change['added']}")

                obj["text"] = change["removed"]

                continue

            if change["added"] != "---":
                if field not in bug and not is_expected_inconsistent_field(
                    field, last_product, bug["id"]
                ):
                    assert_or_log(f"{field} is not present")

            if field in bug and isinstance(bug[field], list):
                if change["added"]:
                    for to_remove in change["added"].split(", "):
                        if field in FIELD_TYPES:
                            try:
                                to_remove = FIELD_TYPES[field](to_remove)
                            except Exception:
                                assert_or_log(
                                    f"Exception while transforming {to_remove} from {bug[field]} (field {field})"
                                )

                        if to_remove in bug[field]:
                            bug[field].remove(to_remove)
                        elif not is_expected_inconsistent_change_list_field(
                            field, bug["id"], to_remove
                        ):
                            assert_or_log(
                                f"{to_remove} is not in {bug[field]}, for field {field}"
                            )

                if change["removed"]:
                    for to_add in change["removed"].split(", "):
                        if field in FIELD_TYPES:
                            try:
                                to_add = FIELD_TYPES[field](to_add)
                            except Exception:
                                assert_or_log(
                                    f"Exception while transforming {to_add} from {bug[field]} (field {field})"
                                )
                        bug[field].append(to_add)
            else:
                if field in FIELD_TYPES:
                    try:
                        old_value = FIELD_TYPES[field](change["removed"])
                    except Exception:
                        assert_or_log(
                            f"Exception while transforming {change['removed']} from {bug[field]} (field {field})"
                        )
                    try:
                        new_value = FIELD_TYPES[field](change["added"])
                    except Exception:
                        assert_or_log(
                            f"Exception while transforming {change['added']} from {bug[field]} (field {field})"
                        )
                else:
                    old_value = change["removed"]
                    new_value = change["added"]

                if (
                    field in bug
                    and bug[field] != new_value
                    and not is_expected_inconsistent_change_field(
                        field, bug["id"], new_value, bug[field]
                    )
                ):
                    assert_or_log(
                        f"Current value for field {field}: ({bug[field]}) is different from previous value: ({new_value})"
                    )

                bug[field] = old_value

    if len(bug["comments"]) == 0:
        assert_or_log("There must be at least one comment")
        bug["comments"] = [
            {
                "count": 0,
                "id": 0,
                "text": "",
                "author": bug["creator"],
                "creation_time": bug["creation_time"],
            }
        ]

    # If the first comment is hidden.
    if bug["comments"][0]["count"] != 0:
        bug["comments"].insert(
            0,
            {
                "id": 0,
                "text": "",
                "author": bug["creator"],
                "creation_time": bug["creation_time"],
            },
        )

    bug["comments"] = [
        c
        for c in bug["comments"]
        if dateutil.parser.parse(c["creation_time"]) - relativedelta(seconds=3)
        <= rollback_date
    ]
    bug["attachments"] = [
        a
        for a in bug["attachments"]
        if dateutil.parser.parse(a["creation_time"]) - relativedelta(seconds=3)
        <= rollback_date
    ]

    return bug