common/recipes-utils/vboot-utils/files/tpm_event_log.py (301 lines of code) (raw):
#!/usr/bin/env python3
#
# Copyright 2018-present Facebook. All Rights Reserved.
#
# This program file is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program in a file named COPYING; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301 USA
import functools
import json
import mmap
import os
import struct
from measure_func import (
Pcr,
measure_spl,
measure_keystore,
measure_uboot,
measure_rcv_uboot,
measure_vbs,
measure_os,
)
from vboot_common import (
get_soc_model,
)
class MalformedTPMEventLog(Exception):
pass
class InvalidTPMEventLog(Exception):
pass
EVENT_LOG_SIZE = 2 * 1024
EVENT_LOG_LOCATION = {
# TPM event log SRAM mapping: (addr , size, log_offset, log_size)
"SOC_MODEL_ASPEED_G5": (0x1E728000, 4 * 1024, 0x800, EVENT_LOG_SIZE),
"SOC_MODEL_ASPEED_G6": (0x10015000, 4 * 1024, 0x000, EVENT_LOG_SIZE),
}
EVENT_LOG_MAX_LEN = EVENT_LOG_SIZE - 8
EVENT_LOG_FMT_VER = 1
EVENT_LOG_MAGIC_CODE = 0xFBBE
# The log start with uint32_t length of data
EVENT_LOG_START_FMT = "=L"
# Represent the TPM event log record preamble as following c strcutre
# struct tpm_event_log_record {
# uint16_t measurement; /* enumeration of measurements */
# uint8_t pcrid; /* enumeration of ast_tpm_pcrs */
# uint8_t algo; /* enumeration of tpm2_algorithms */
# uint32_t index;
# uint8_t digest[0]; /* hash value, variable length depedents on algo */
# };
EVENT_LOG_RECORD_FMT = "=HBBL"
# Represent the log end mark as following c structure
# union tpm_event_log_end_mark {
# uint32_t endmark;
# struct {
# uint16_t magic;
# uint16_t fmtver;
# };
# };
EVENT_LOG_ENDMARK_FMT = "=HH"
measure_name = {
1: "spl",
2: "key-store",
3: "u-boot",
4: "rec-u-boot",
5: "u-boot-env",
6: "vbs",
7: "os:kernel",
8: "os:rootfs",
9: "os:dtb",
10: "recovery-os:kernel",
11: "recovery-os:rootfs",
12: "recovery-os:dtb",
}
digest_algo_info = {
0x0B: ("sha256", 256 // 8),
0x0C: ("sha384", 384 // 8),
0x0D: ("sha512", 512 // 8),
}
def get_location_size(soc_model):
SRAM_OFFSET, _, LOG_OFFSET, log_data_size = EVENT_LOG_LOCATION[soc_model]
return (SRAM_OFFSET + LOG_OFFSET, log_data_size)
def read_log_data_from_sram():
memfn = None
try:
(
SRAM_OFFSET,
SRAM_SIZE,
LOG_OFFSET,
log_data_size,
) = EVENT_LOG_LOCATION[get_soc_model()]
memfn = os.open("/dev/mem", os.O_RDONLY | os.O_SYNC)
with mmap.mmap(
memfn,
SRAM_SIZE,
mmap.MAP_SHARED,
mmap.PROT_READ,
offset=SRAM_OFFSET,
) as sram:
sram.seek(LOG_OFFSET)
return sram.read(log_data_size)
finally:
if memfn is not None:
os.close(memfn)
def read_log_data_from_file(fname):
with open(fname, "br") as f:
return f.read()
def print_tpm_event_log_debug_suggestion():
tpm_log_loc_size = {}
try:
soc_model = get_soc_model()
tpm_log_loc_size[soc_model] = get_location_size(soc_model)
except Exception:
print("cannot detect BMC SOC model automatically")
for soc_model in EVENT_LOG_LOCATION:
tpm_log_loc_size[soc_model] = get_location_size(soc_model)
print("run command to dump the raw event log to debug")
for soc_model in tpm_log_loc_size:
log_loc, log_size = tpm_log_loc_size[soc_model]
print(
f"""
{soc_model}:
$> phymemdump {log_loc:#08x} {log_size:#x} -b | hexdump -e '4/4 "%08x " "\\n"'
or
$> phymemdump {log_loc:#08x} {log_size:#x} -b > /tmp/event-log.bin
and download event-log.bin
"""
)
class TPMEventRecord(object):
def __init__(
self,
mid: int,
measure: str,
pcrid: int,
algo: str,
index: int,
digest: bytearray,
):
self.mid = mid
self.measure = measure
self.pcrid = pcrid
self.algo = algo
self.index = index
self.digest = digest
class TPMEventLog(object):
def __init__(self, flash0_meta, flash1_meta, eventfile=None):
self.flash0_meta = flash0_meta
self.flash1_meta = flash1_meta
self.eventfile = eventfile
self.tpm_events = self._load_tpm_events()
def _load_tpm_events(self) -> "list[TPMEventRecord]":
tpm_events = []
if self.eventfile:
log_data = read_log_data_from_file(self.eventfile)
else:
log_data = read_log_data_from_sram()
# sanity check len
log_len = struct.unpack_from(EVENT_LOG_START_FMT, log_data, 0)[0]
if log_len > EVENT_LOG_MAX_LEN:
raise MalformedTPMEventLog(
f"Malformed TPM Log, len {log_len} > {self.VENT_LOG_MAX_LEN}"
)
# sanity check end mark
end_mark_magic, end_mark_fmt_ver = struct.unpack_from(
EVENT_LOG_ENDMARK_FMT,
log_data,
struct.calcsize(EVENT_LOG_START_FMT) + log_len,
)
if end_mark_magic != EVENT_LOG_MAGIC_CODE:
raise MalformedTPMEventLog(
"Malformed TPM log, magic code 0x%04x != 0x%04x"
% (
end_mark_magic,
EVENT_LOG_MAGIC_CODE,
)
)
if end_mark_fmt_ver not in [EVENT_LOG_FMT_VER]:
raise MalformedTPMEventLog(
"TPM log format ver-%d not supported, only support %s"
% (
end_mark_fmt_ver,
[EVENT_LOG_FMT_VER],
)
)
# parser the log records
log_data = log_data[
struct.calcsize(EVENT_LOG_START_FMT) : -struct.calcsize(
EVENT_LOG_ENDMARK_FMT
)
]
cur = 0
while cur < log_len:
mid, pcrid, algo_id, index = struct.unpack_from(
EVENT_LOG_RECORD_FMT, log_data, cur
)
cur += struct.calcsize(EVENT_LOG_RECORD_FMT)
algo, digest_len = digest_algo_info[algo_id]
tpm_events.append(
TPMEventRecord(
mid,
measure_name[mid],
pcrid,
algo,
index,
log_data[cur : cur + digest_len],
)
)
cur += digest_len
if cur != log_len:
raise MalformedTPMEventLog(
f"Malformed event log, length {cur} != {log_len}"
)
return tpm_events
def print_events(self, asjosn=False):
class EventLogEncoder(json.JSONEncoder):
def default(self, obj):
if hasattr(obj, "hex"):
return obj.hex()
return json.JSONEncoder.default(self, obj)
if asjosn:
print(json.dumps(self.tpm_events, cls=EventLogEncoder))
return
# normal print
header_str = "".join(
[
f"{'ID':<4}",
f"{'MID':^4}",
f"{'Measure-Name':^20}",
f"{'PCR':^4}",
f"{'Index':^6}",
f"{'Algo':^8}",
f"{'Digest':^64}",
]
)
print("=" * len(header_str))
print(header_str)
print("-" * len(header_str))
for i, event in enumerate(self.tpm_events, start=1):
print(
"".join(
[
f"{i:<4d}",
f"{event.mid:^4d}",
f"{event.measure:^20}",
f"{event.pcrid:^4d}",
f"{event.index:^6d}",
f"{event.algo:^8}",
f"{event.digest.hex():>64}",
]
)
)
print("=" * len(header_str))
return
@functools.lru_cache(maxsize=1)
def _measure_os_cached(self, algo, recal):
return measure_os(algo, self.flash1_meta, recal, True)
@functools.lru_cache(maxsize=1)
def _measure_rec_os_cached(self, algo, recal):
return measure_os(algo, self.flash0_meta, recal, True)
def _check_digest(self, measure, algo, digest, recal):
expect_digest = None
if measure == "spl":
expect_digest = measure_spl(algo, self.flash0_meta, True)
elif measure == "key-store":
expect_digest = measure_keystore(algo, self.flash1_meta, True)
elif measure == "u-boot":
expect_digest = measure_uboot(algo, self.flash1_meta, recal, True)
elif measure == "rec-u-boot":
expect_digest = measure_rcv_uboot(algo, self.flash0_meta, True)
elif measure == "os:kernel":
expect_digest = self._measure_os_cached(algo, recal)[0]
elif measure == "os:rootfs":
expect_digest = self._measure_os_cached(algo, recal)[1]
elif measure == "os:dtb":
expect_digest = self._measure_os_cached(algo, recal)[2]
elif measure == "recovery-os:kernel":
expect_digest = self._measure_rec_os_cached(algo, recal)[0]
elif measure == "recovery-os:rootfs":
expect_digest = self._measure_rec_os_cached(algo, recal)[1]
elif measure == "recovery-os:dtb":
expect_digest = self._measure_rec_os_cached(algo, recal)[2]
elif measure == "u-boot-env": # u-boot-env can change skip checking
expect_digest = digest
elif measure == "vbs":
try:
expect_digest = measure_vbs(algo, True)
except Exception:
# not on BMC cannot measure vbs by-pass
expect_digest = digest
if expect_digest is None:
raise InvalidTPMEventLog(f"Unknown meausre({measure})")
if digest != expect_digest:
raise InvalidTPMEventLog(
f"{measure} digest does't match\n\t[%s](event)\n\t[%s](expect)"
% (digest.hex(), expect_digest.hex())
)
def _build_pcr_replay(self, chkdigest, recal):
pcr_replay = {}
# build up PCR replay data from event
for event in self.tpm_events:
replay_target = f"{event.pcrid}.{event.algo}"
if replay_target not in pcr_replay:
pcr_replay[replay_target] = []
# check the digest
if chkdigest:
self._check_digest(event.measure, event.algo, event.digest, recal)
# add the digest into replay target's replay list
pcr_replay[replay_target].append((event.index, event.measure, event.digest))
# check each replay target's replay extension index start with 0, no gap
for pcr_target, replay_list in pcr_replay.items():
replay_list.sort(key=lambda e: e[0])
if replay_list[0][0] != 0 or replay_list[-1][0] + 1 != len(replay_list):
raise InvalidTPMEventLog(
"%s extension event in wrong sequence %s"
% (
pcr_target,
[e[0] for e in replay_list],
)
)
return pcr_replay
def replay_measure(self, chkdigest, recal):
replayed_measures = []
pcr_replay = self._build_pcr_replay(chkdigest, recal)
for replay_target, replay_list in pcr_replay.items():
[pcrid, algo] = replay_target.split(".")
pcr = Pcr(algo)
major_measure = set()
for _, measure, digest in replay_list:
pcr.extend(digest)
major_measure.add(measure.split(":")[0])
if len(major_measure) != 1:
raise InvalidTPMEventLog(
"Multiple major measure {set} extend in same PCR{pcrid}"
)
replayed_measures.append(
{
"component": major_measure.pop(),
"pcr_id": int(pcrid),
"algo": algo,
"expect": pcr.value.hex(),
"measure": "NA",
}
)
return replayed_measures