optimum_benchmark/trackers/energy.py (192 lines of code) (raw):
import json
import os
from contextlib import contextmanager
from dataclasses import asdict, dataclass
from logging import getLogger
from typing import List, Literal, Optional, Union
from rich.console import Console
from rich.markdown import Markdown
from ..import_utils import is_codecarbon_available, is_torch_available
if is_torch_available():
import torch
if is_codecarbon_available():
from codecarbon import EmissionsTracker, OfflineEmissionsTracker
from codecarbon.output import EmissionsData
CONSOLE = Console()
LOGGER = getLogger("energy")
POWER_UNIT = "W"
ENERGY_UNIT = "kWh"
POWER_CONSUMPTION_SAMPLING_RATE = 1 # in seconds
Energy_Unit_Literal = Literal["kWh"]
Efficiency_Unit_Literal = Literal["samples/kWh", "tokens/kWh", "images/kWh"]
@dataclass
class Energy:
unit: Energy_Unit_Literal
cpu: float
ram: float
gpu: float
total: float
def __sub__(self, other: "Energy") -> "Energy":
"""Enables subtraction of two Energy instances using the '-' operator."""
if self.unit != other.unit:
raise ValueError("Energy units must match to perform subtraction")
return Energy(
unit=self.unit,
cpu=self.cpu - other.cpu,
gpu=self.gpu - other.gpu,
ram=self.ram - other.ram,
total=self.total - other.total,
)
def __truediv__(self, scalar: float) -> "Energy":
return Energy(
unit=self.unit,
cpu=self.cpu / scalar,
gpu=self.gpu / scalar,
ram=self.ram / scalar,
total=self.total / scalar,
)
@staticmethod
def aggregate_across_processes(energies: List[Optional["Energy"]]) -> Optional["Energy"]:
if len(energies) == 0:
raise ValueError("No energy measurements to aggregate")
elif any(energy is None for energy in energies):
raise ValueError("Some energy measurements are missing")
# since measurements are machine-level, we just take the average
total = sum(energy.total for energy in energies) / len(energies)
cpu = sum(energy.cpu for energy in energies) / len(energies)
gpu = sum(energy.gpu for energy in energies) / len(energies)
ram = sum(energy.ram for energy in energies) / len(energies)
unit = energies[0].unit
return Energy(cpu=cpu, gpu=gpu, ram=ram, total=total, unit=unit)
def to_plain_text(self) -> str:
plain_text = ""
plain_text += "\t\t+ cpu: {cpu:f} ({unit})\n"
plain_text += "\t\t+ gpu: {gpu:f} ({unit})\n"
plain_text += "\t\t+ ram: {ram:f} ({unit})\n"
plain_text += "\t\t+ total: {total:f} ({unit})\n"
return plain_text.format(**asdict(self))
def log(self):
for line in self.to_plain_text().split("\n"):
if line:
LOGGER.info(line)
def to_markdown_text(self) -> str:
markdown_text = ""
markdown_text += "| metric | value | unit |\n"
markdown_text += "| :--------- | --------: | -----: |\n"
markdown_text += "| cpu | {cpu:f} | {unit} |\n"
markdown_text += "| gpu | {gpu:f} | {unit} |\n"
markdown_text += "| ram | {ram:f} | {unit} |\n"
markdown_text += "| total | {total:f} | {unit} |\n"
return markdown_text.format(**asdict(self))
def print(self):
CONSOLE.print(Markdown(self.to_markdown_text()))
@dataclass
class Efficiency:
unit: Efficiency_Unit_Literal
value: float
@staticmethod
def aggregate_across_processes(efficiencies: List[Optional["Efficiency"]]) -> Optional["Efficiency"]:
if len(efficiencies) == 0:
raise ValueError("No efficiency measurements to aggregate")
elif any(efficiency is None for efficiency in efficiencies):
raise ValueError("Some efficiency measurements are None")
# since measurements are machine-level, we just take the average
value = sum(efficiency.value for efficiency in efficiencies) / len(efficiencies)
unit = efficiencies[0].unit
return Efficiency(value=value, unit=unit)
@staticmethod
def from_energy(energy: "Energy", volume: int, unit: str) -> "Efficiency":
return Efficiency(value=volume / energy.total if energy.total > 0 else 0, unit=unit)
def to_plain_text(self) -> str:
plain_text = ""
plain_text += "\t\t+ efficiency: {value:f} ({unit})\n"
return plain_text.format(**asdict(self))
def log(self):
for line in self.to_plain_text().split("\n"):
if line:
LOGGER.info(line)
def to_markdown_text(self) -> str:
markdown_text = ""
markdown_text += "| metric | value | unit |\n"
markdown_text += "| :--------- | --------: | -----: |\n"
markdown_text += "| efficiency | {value:f} | {unit} |\n"
return markdown_text.format(**asdict(self))
def print(self):
CONSOLE.print(Markdown(self.to_markdown_text()))
class EnergyTracker:
def __init__(self, device: str, backend: str, device_ids: Optional[Union[str, int, List[int]]] = None):
self.device = device
self.backend = backend
self.device_ids = device_ids
self.is_gpu = self.device == "cuda"
self.is_pytorch_cuda = (self.backend, self.device) == ("pytorch", "cuda")
LOGGER.info("\t\t+ Tracking RAM and CPU energy consumption")
if self.is_gpu:
if isinstance(self.device_ids, str):
self.device_ids = list(map(int, self.device_ids.split(",")))
elif isinstance(self.device_ids, int):
self.device_ids = [self.device_ids]
elif isinstance(self.device_ids, list):
self.device_ids = self.device_ids
elif self.device_ids is None:
raise ValueError("GPU device IDs must be provided for energy tracking on GPUs")
else:
raise ValueError("GPU device IDs must be a string, an integer, or a list of integers")
LOGGER.info(f"\t\t+ Tracking GPU energy consumption on devices {self.device_ids}")
if not is_codecarbon_available():
raise ValueError(
"The library codecarbon is required to run energy benchmark, but is not installed. "
"Please install it through `pip install codecarbon`."
)
try:
self.emission_tracker = EmissionsTracker(
log_level="warning",
# tracking_mode="process" only tries to track memory consumption of current process
# but computes cpu and gpu energy consumption based on the machine-level tracking
tracking_mode="machine",
gpu_ids=self.device_ids,
# allow multiple trackers to run in the same machine (e.g., for distributed inference/training)
# and also for testing purposes (we run many benchmarks in parallel)
# https://github.com/mlco2/codecarbon/pull/562 added this feature
# but it doesn't explain why one tracker is better than multiple
allow_multiple_runs=True,
output_file="codecarbon.csv",
measure_power_secs=POWER_CONSUMPTION_SAMPLING_RATE,
)
except Exception:
LOGGER.warning("\t\t+ Falling back to Offline Emissions Tracker")
if os.environ.get("COUNTRY_ISO_CODE", None) is None:
LOGGER.warning(
"\t\t+ Offline Emissions Tracker requires COUNTRY_ISO_CODE to be set. "
"We will set it to USA but the carbon footprint might be inaccurate."
)
self.emission_tracker = OfflineEmissionsTracker(
log_level="warning",
# tracking_mode="process" only tries to track memory consumption of current process
# but computes cpu and gpu energy consumption based on the machine-level tracking
tracking_mode="machine",
gpu_ids=self.device_ids,
# allow multiple trackers to run in the same machine (e.g., for distributed inference/training)
# and also for testing purposes (we run many benchmarks in parallel)
# https://github.com/mlco2/codecarbon/pull/562 added this feature
# but it doesn't explain why one tracker is better than multiple
allow_multiple_runs=True,
output_file="codecarbon.csv",
measure_power_secs=POWER_CONSUMPTION_SAMPLING_RATE,
country_iso_code=os.environ.get("COUNTRY_ISO_CODE", "USA"),
)
self.total_energy: Optional[float] = None
self.cpu_energy: Optional[float] = None
self.gpu_energy: Optional[float] = None
self.ram_energy: Optional[float] = None
def reset(self):
self.total_energy = None
self.cpu_energy = None
self.gpu_energy = None
self.ram_energy = None
@contextmanager
def track(self, task_name: str = "task"):
if self.is_pytorch_cuda:
torch.cuda.synchronize()
self.emission_tracker.start_task(task_name=task_name)
yield
if self.is_pytorch_cuda:
torch.cuda.synchronize()
emission_data: EmissionsData = self.emission_tracker.stop_task()
with open(f"{task_name}_codecarbon.json", "w") as f:
LOGGER.info(f"\t\t+ Saving codecarbon emission data to {task_name}_codecarbon.json")
json.dump(asdict(emission_data), f, indent=4)
self.total_energy = emission_data.energy_consumed
self.cpu_energy = emission_data.cpu_energy
self.gpu_energy = emission_data.gpu_energy
self.ram_energy = emission_data.ram_energy
def get_energy(self) -> Energy:
assert self.total_energy is not None, "Energy must be tracked before calling this method"
return Energy(
unit=ENERGY_UNIT, cpu=self.cpu_energy, gpu=self.gpu_energy, ram=self.ram_energy, total=self.total_energy
)