in dcrpm/rpmutil.py [0:0]
def check_rpmdb_indexes(self):
# type: () -> None
"""
For each rpmdb file we define a rpm command that blows up on inconsistencies,
or returns incorrect results. Structure:
'name_of_file': {
'cmd': 'str', # rpm command
'checks': [], # list of conditions to be met
}
"""
if sys.platform == "darwin":
self.logger.debug("check_rpmdb_indexes is not implemented for darwin")
return
rpmdb_indexes = {
"Basenames": {
"cmd": [self.rpm_path, "-qf", self.rpm_path, "--dbpath", self.dbpath],
"checks": [
lambda proc: proc.returncode != StatusCode.SEGFAULT,
lambda proc: len(proc.stdout.splitlines()) == 1,
lambda proc: proc.stdout.splitlines()[0].startswith("rpm-"),
],
},
"Conflictname": {
"cmd": [
self.rpm_path,
"-q",
"--conflicts",
"setup",
"--dbpath",
self.dbpath,
],
"checks": [
lambda proc: proc.returncode != StatusCode.SEGFAULT,
lambda proc: len(proc.stdout.splitlines()) == 3,
],
},
"Obsoletename": {
"cmd": [
self.rpm_path,
"-q",
"--obsoletes",
"coreutils",
"--dbpath",
self.dbpath,
],
"checks": [
lambda proc: proc.returncode != StatusCode.SEGFAULT,
lambda proc: len(proc.stdout.splitlines()) >= 1,
],
},
"Providename": {
"cmd": [
self.rpm_path,
"-q",
"--whatprovides",
"rpm",
"--dbpath",
self.dbpath,
],
"checks": [
lambda proc: proc.returncode != StatusCode.SEGFAULT,
lambda proc: len(proc.stdout.splitlines()) == 1,
lambda proc: proc.stdout.splitlines()[0].startswith("rpm-"),
],
},
"Requirename": {
"cmd": [
self.rpm_path,
"-q",
"--whatrequires",
"rpm",
"--dbpath",
self.dbpath,
],
"checks": [
lambda proc: proc.returncode != StatusCode.SEGFAULT,
lambda proc: len(proc.stdout.splitlines()) >= 1,
lambda proc: any(
line.startswith("rpm-") or line.startswith("yum-")
for line in proc.stdout.splitlines()
),
],
},
"Recommendname": None,
"Dirnames": None,
"Group": None,
"Name": None,
"Installtid": None,
"Enhancename": None, # rarely used
"Filetriggername": None, # rarely used
"Suggestname": None, # rarely used
"Supplementname": None, # rarely used
"Transfiletriggername": None, # rarely used
"Triggername": None, # rarely used
} # type: t.Dict[str, t.Optional[t.Dict[str, t.Union[t.Sequence[str], t.Sequence[t.Callable[[CompletedProcess], bool]]]]]]
# Checks for Packages db corruption
post_checks = [
lambda proc: not any(
"cannot open Packages database" in line
for line in proc.stderr.splitlines()
),
lambda proc: any(
"missing index" in line for line in proc.stderr.splitlines()
),
]
for index, config in rpmdb_indexes.items():
# Skip over indexes with no defined checks / conditions
if not config:
continue
try:
# Skip over non existing indexes
if not os.path.join(self.dbpath, index):
self.logger.info("{} does not exist".format(index))
continue
self.logger.info("Attempting to selectively poke at %s index", index)
if t:
self._poke_index(
t.cast(t.List[str], config["cmd"]),
t.cast(
t.List[t.Callable[[CompletedProcess], bool]],
config["checks"],
),
)
else:
# pyre-ignore[6]: config["cmd"] and config["checks"]
self._poke_index(config["cmd"], config["checks"])
except DBIndexNeedsRebuild:
self.status_logger.info(RepairAction.INDEX_REBUILD)
index_path = os.path.join(self.dbpath, index)
if os.path.isfile(index_path):
self.logger.info("%s index is out of whack, deleting it", index)
os.remove(index_path)
else:
self.logger.info("%s index is missing", index)
# Run the same command again, which should trigger a rebuild
if t:
proc = self._poke_index(t.cast(t.List[str], config["cmd"]), [])
else:
# pyre-ignore[6]: config["cmd"]
proc = self._poke_index(config["cmd"], [])
# Sometimes single index rebuilds don't work, as rpm fails to
# open Packages db. In that case we'll try a full recovery
for check in post_checks:
if not check(proc):
self.logger.info("Granular index rebuild failed")
raise DBNeedsRecovery()
except DcRPMException:
self.logger.info("RPM commands are failing too hard")
raise DBNeedsRecovery()