def check_rpmdb_indexes()

in dcrpm/rpmutil.py [0:0]


    def check_rpmdb_indexes(self):
        # type: () -> None
        """
        For each rpmdb file we define a rpm command that blows up on inconsistencies,
        or returns incorrect results. Structure:
        'name_of_file': {
            'cmd': 'str', # rpm command
            'checks': [], # list of conditions to be met
        }
        """
        if sys.platform == "darwin":
            self.logger.debug("check_rpmdb_indexes is not implemented for darwin")
            return

        rpmdb_indexes = {
            "Basenames": {
                "cmd": [self.rpm_path, "-qf", self.rpm_path, "--dbpath", self.dbpath],
                "checks": [
                    lambda proc: proc.returncode != StatusCode.SEGFAULT,
                    lambda proc: len(proc.stdout.splitlines()) == 1,
                    lambda proc: proc.stdout.splitlines()[0].startswith("rpm-"),
                ],
            },
            "Conflictname": {
                "cmd": [
                    self.rpm_path,
                    "-q",
                    "--conflicts",
                    "setup",
                    "--dbpath",
                    self.dbpath,
                ],
                "checks": [
                    lambda proc: proc.returncode != StatusCode.SEGFAULT,
                    lambda proc: len(proc.stdout.splitlines()) == 3,
                ],
            },
            "Obsoletename": {
                "cmd": [
                    self.rpm_path,
                    "-q",
                    "--obsoletes",
                    "coreutils",
                    "--dbpath",
                    self.dbpath,
                ],
                "checks": [
                    lambda proc: proc.returncode != StatusCode.SEGFAULT,
                    lambda proc: len(proc.stdout.splitlines()) >= 1,
                ],
            },
            "Providename": {
                "cmd": [
                    self.rpm_path,
                    "-q",
                    "--whatprovides",
                    "rpm",
                    "--dbpath",
                    self.dbpath,
                ],
                "checks": [
                    lambda proc: proc.returncode != StatusCode.SEGFAULT,
                    lambda proc: len(proc.stdout.splitlines()) == 1,
                    lambda proc: proc.stdout.splitlines()[0].startswith("rpm-"),
                ],
            },
            "Requirename": {
                "cmd": [
                    self.rpm_path,
                    "-q",
                    "--whatrequires",
                    "rpm",
                    "--dbpath",
                    self.dbpath,
                ],
                "checks": [
                    lambda proc: proc.returncode != StatusCode.SEGFAULT,
                    lambda proc: len(proc.stdout.splitlines()) >= 1,
                    lambda proc: any(
                        line.startswith("rpm-") or line.startswith("yum-")
                        for line in proc.stdout.splitlines()
                    ),
                ],
            },
            "Recommendname": None,
            "Dirnames": None,
            "Group": None,
            "Name": None,
            "Installtid": None,
            "Enhancename": None,  # rarely used
            "Filetriggername": None,  # rarely used
            "Suggestname": None,  # rarely used
            "Supplementname": None,  # rarely used
            "Transfiletriggername": None,  # rarely used
            "Triggername": None,  # rarely used
        }  # type: t.Dict[str, t.Optional[t.Dict[str, t.Union[t.Sequence[str], t.Sequence[t.Callable[[CompletedProcess], bool]]]]]]

        # Checks for Packages db corruption
        post_checks = [
            lambda proc: not any(
                "cannot open Packages database" in line
                for line in proc.stderr.splitlines()
            ),
            lambda proc: any(
                "missing index" in line for line in proc.stderr.splitlines()
            ),
        ]

        for index, config in rpmdb_indexes.items():
            # Skip over indexes with no defined checks / conditions
            if not config:
                continue

            try:
                # Skip over non existing indexes
                if not os.path.join(self.dbpath, index):
                    self.logger.info("{} does not exist".format(index))
                    continue

                self.logger.info("Attempting to selectively poke at %s index", index)
                if t:
                    self._poke_index(
                        t.cast(t.List[str], config["cmd"]),
                        t.cast(
                            t.List[t.Callable[[CompletedProcess], bool]],
                            config["checks"],
                        ),
                    )
                else:
                    # pyre-ignore[6]: config["cmd"] and config["checks"]
                    self._poke_index(config["cmd"], config["checks"])

            except DBIndexNeedsRebuild:
                self.status_logger.info(RepairAction.INDEX_REBUILD)
                index_path = os.path.join(self.dbpath, index)
                if os.path.isfile(index_path):
                    self.logger.info("%s index is out of whack, deleting it", index)
                    os.remove(index_path)
                else:
                    self.logger.info("%s index is missing", index)

                # Run the same command again, which should trigger a rebuild
                if t:
                    proc = self._poke_index(t.cast(t.List[str], config["cmd"]), [])
                else:
                    # pyre-ignore[6]: config["cmd"]
                    proc = self._poke_index(config["cmd"], [])

                # Sometimes single index rebuilds don't work, as rpm fails to
                # open Packages db. In that case we'll try a full recovery
                for check in post_checks:
                    if not check(proc):
                        self.logger.info("Granular index rebuild failed")
                        raise DBNeedsRecovery()

            except DcRPMException:
                self.logger.info("RPM commands are failing too hard")
                raise DBNeedsRecovery()