optimum/habana/utils.py (236 lines of code) (raw):
# coding=utf-8
# Copyright 2022 the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import subprocess
import time
from typing import Any, Dict, List, Optional
import numpy as np
import torch
from packaging import version
from transformers.utils import is_torch_available
from optimum.utils import logging
from .version import __version__
logger = logging.get_logger(__name__)
CURRENTLY_VALIDATED_SYNAPSE_VERSION = version.parse("1.21.0")
def to_device_dtype(my_input: Any, target_device: torch.device = None, target_dtype: torch.dtype = None):
"""
Move a state_dict to the target device and convert it into target_dtype.
Args:
my_input : input to transform
target_device (torch.device, optional): target_device to move the input on. Defaults to None.
target_dtype (torch.dtype, optional): target dtype to convert the input into. Defaults to None.
Returns:
: transformed input
"""
if isinstance(my_input, torch.Tensor):
if target_device is None:
target_device = my_input.device
if target_dtype is None:
target_dtype = my_input.dtype
return my_input.to(device=target_device, dtype=target_dtype)
elif isinstance(my_input, list):
return [to_device_dtype(i, target_device, target_dtype) for i in my_input]
elif isinstance(my_input, tuple):
return tuple(to_device_dtype(i, target_device, target_dtype) for i in my_input)
elif isinstance(my_input, dict):
return {k: to_device_dtype(v, target_device, target_dtype) for k, v in my_input.items()}
else:
return my_input
def speed_metrics(
split: str,
start_time: float,
num_samples: int = None,
num_steps: int = None,
num_tokens: int = None,
start_time_after_warmup: float = None,
log_evaluate_save_time: float = None,
) -> Dict[str, float]:
"""
Measure and return speed performance metrics.
This function requires a time snapshot `start_time` before the operation to be measured starts and this function
should be run immediately after the operation to be measured has completed.
Args:
split (str): name to prefix metric (like train, eval, test...)
start_time (float): operation start time
num_samples (int, optional): number of samples processed. Defaults to None.
num_steps (int, optional): number of steps performed. Defaults to None.
num_tokens (int, optional): number of tokens processed. Defaults to None.
start_time_after_warmup (float, optional): time after warmup steps have been performed. Defaults to None.
log_evaluate_save_time (float, optional): time spent to log, evaluate and save. Defaults to None.
Returns:
Dict[str, float]: dictionary with performance metrics.
"""
runtime = time.time() - start_time
result = {f"{split}_runtime": round(runtime, 4)}
if runtime == 0:
return result
# Adjust runtime if log_evaluate_save_time should not be included
if log_evaluate_save_time is not None:
runtime = runtime - log_evaluate_save_time
# Adjust runtime if there were warmup steps
if start_time_after_warmup is not None:
runtime = runtime + start_time - start_time_after_warmup
# Compute throughputs
if num_samples is not None:
samples_per_second = num_samples / runtime
result[f"{split}_samples_per_second"] = round(samples_per_second, 3)
if num_steps is not None:
steps_per_second = num_steps / runtime
result[f"{split}_steps_per_second"] = round(steps_per_second, 3)
if num_tokens is not None:
tokens_per_second = num_tokens / runtime
result[f"{split}_tokens_per_second"] = round(tokens_per_second, 3)
return result
def warmup_inference_steps_time_adjustment(
start_time_after_warmup, start_time_after_inference_steps_warmup, num_inference_steps, warmup_steps
):
"""
Adjust start time after warmup to account for warmup inference steps.
When warmup is applied to multiple inference steps within a single sample generation we need to account for
skipped inference steps time to estimate "per sample generation time". This function computes the average
inference time per step and adjusts the start time after warmup accordingly.
Args:
start_time_after_warmup: time after warmup steps have been performed
start_time_after_inference_steps_warmup: time after warmup inference steps have been performed
num_inference_steps: total number of inference steps per sample generation
warmup_steps: number of warmup steps
Returns:
[float]: adjusted start time after warmup which accounts for warmup inference steps based on average non-warmup steps time
"""
if num_inference_steps > warmup_steps:
avg_time_per_inference_step = (time.time() - start_time_after_inference_steps_warmup) / (
num_inference_steps - warmup_steps
)
start_time_after_warmup -= avg_time_per_inference_step * warmup_steps
return start_time_after_warmup
def to_gb_rounded(mem: float) -> float:
"""
Rounds and converts to GB.
Args:
mem (float): memory in bytes
Returns:
float: memory in GB rounded to the second decimal
"""
return np.round(mem / 1024**3, 2)
def get_hpu_memory_stats(device=None) -> Dict[str, float]:
"""
Returns memory stats of HPU as a dictionary:
- current memory allocated (GB)
- maximum memory allocated (GB)
- total memory available (GB)
Returns:
Dict[str, float]: memory stats.
"""
from habana_frameworks.torch.hpu import memory_stats
mem_stats = memory_stats(device)
mem_dict = {
"memory_allocated (GB)": to_gb_rounded(mem_stats["InUse"]),
"max_memory_allocated (GB)": to_gb_rounded(mem_stats["MaxInUse"]),
"total_memory_available (GB)": to_gb_rounded(mem_stats["Limit"]),
}
return mem_dict
def set_seed(seed: int):
"""
Helper function for reproducible behavior to set the seed in `random`, `numpy` and `torch`.
Args:
seed (`int`): The seed to set.
"""
random.seed(seed)
np.random.seed(seed)
if is_torch_available():
from habana_frameworks.torch.hpu import random as hpu_random
torch.manual_seed(seed)
hpu_random.manual_seed_all(seed)
def check_synapse_version():
"""
Checks whether the versions of SynapseAI and drivers have been validated for the current version of Optimum Habana.
"""
# Change the logging format
logging.enable_default_handler()
logging.enable_explicit_format()
# Check the version of habana_frameworks
habana_frameworks_version_number = get_habana_frameworks_version()
if (
habana_frameworks_version_number.major != CURRENTLY_VALIDATED_SYNAPSE_VERSION.major
or habana_frameworks_version_number.minor != CURRENTLY_VALIDATED_SYNAPSE_VERSION.minor
):
logger.warning(
f"optimum-habana v{__version__} has been validated for SynapseAI v{CURRENTLY_VALIDATED_SYNAPSE_VERSION} but habana-frameworks v{habana_frameworks_version_number} was found, this could lead to undefined behavior!"
)
# Check driver version
driver_version = get_driver_version()
# This check is needed to make sure an error is not raised while building the documentation
# Because the doc is built on an instance that does not have `hl-smi`
if driver_version is not None:
if (
driver_version.major != CURRENTLY_VALIDATED_SYNAPSE_VERSION.major
or driver_version.minor != CURRENTLY_VALIDATED_SYNAPSE_VERSION.minor
):
logger.warning(
f"optimum-habana v{__version__} has been validated for SynapseAI v{CURRENTLY_VALIDATED_SYNAPSE_VERSION} but the driver version is v{driver_version}, this could lead to undefined behavior!"
)
else:
logger.warning(
"Could not run `hl-smi`, please follow the installation guide: https://docs.habana.ai/en/latest/Installation_Guide/index.html."
)
def get_habana_frameworks_version():
"""
Returns the installed version of SynapseAI.
"""
output = subprocess.run(
"pip list | grep habana-torch-plugin",
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return version.parse(output.stdout.split("\n")[0].split()[-1])
def get_driver_version():
"""
Returns the driver version.
"""
# Enable console printing for `hl-smi` check
output = subprocess.run(
"hl-smi", shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"ENABLE_CONSOLE": "true"}
)
if output.returncode == 0 and output.stdout:
return version.parse(output.stdout.split("\n")[2].replace(" ", "").split(":")[1][:-1].split("-")[0])
return None
class HabanaGenerationTime(object):
def __init__(self, iteration_times: Optional[List[float]] = None):
self.iteration_times = iteration_times if iteration_times is not None else []
self.start_time = None
self.last_duration = None
def start(self):
self.start_time = time.perf_counter()
def is_running(self):
return self.start_time is not None
def step(self):
if not self.is_running():
raise ValueError("start() must be call before step()")
end_time = time.perf_counter()
duration = end_time - self.start_time
self.iteration_times.append(duration)
self.last_duration = duration
self.start_time = end_time
def total_time(self):
return sum(self.iteration_times)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.step()
class HabanaProfile(object):
"""
HPU profiler only could be run once, so HABANA_PROFILE_ENABLED, a class static variable shared by all the instances of HabanaProfile, is used to control which part will be captured.
"""
HABANA_PROFILE_ENABLED = True
def __init__(
self,
warmup: int = 0,
active: int = 0,
record_shapes: bool = True,
with_stack: bool = False,
output_dir: str = "./hpu_profile",
wait: int = 0,
):
if active <= 0 or warmup < 0 or not HabanaProfile.HABANA_PROFILE_ENABLED:
def noop():
pass
self.start = noop
self.stop = noop
self.step = noop
else:
HabanaProfile.HABANA_PROFILE_ENABLED = False
schedule = torch.profiler.schedule(wait=wait, warmup=warmup, active=active, repeat=1)
activities = [torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.HPU]
profiler = torch.profiler.profile(
schedule=schedule,
activities=activities,
on_trace_ready=torch.profiler.tensorboard_trace_handler(output_dir),
record_shapes=record_shapes,
with_stack=with_stack,
)
self.start = profiler.start
self.stop = profiler.stop
self.step = profiler.step
HabanaProfile.enable.invalid = True
HabanaProfile.disable.invalid = True
def stop(self):
self.stop()
def start(self):
self.start()
def step(self):
self.step()
@staticmethod
def disable():
"""
Runs only once and must happen before doing profiling.
"""
if hasattr(HabanaProfile.disable, "invalid"):
if not HabanaProfile.disable.invalid:
HabanaProfile.HABANA_PROFILE_ENABLED = False
else:
HabanaProfile.HABANA_PROFILE_ENABLED = False
@staticmethod
def enable():
"""
Runs only once and must happen before doing profiling.
"""
if hasattr(HabanaProfile.enable, "invalid"):
if not HabanaProfile.enable.invalid:
HabanaProfile.HABANA_PROFILE_ENABLED = True
else:
HabanaProfile.HABANA_PROFILE_ENABLED = True
def check_optimum_habana_min_version(min_version):
"""
Checks if the installed version of `optimum-habana` is larger than or equal to `min_version`.
Copied from: https://github.com/huggingface/transformers/blob/c41291965f078070c5c832412f5d4a5f633fcdc4/src/transformers/utils/__init__.py#L212
"""
if version.parse(__version__) < version.parse(min_version):
error_message = (
f"This example requires `optimum-habana` to have a minimum version of {min_version},"
f" but the version found is {__version__}.\n"
)
if "dev" in min_version:
error_message += (
"You can install it from source with: "
"`pip install git+https://github.com/huggingface/optimum-habana.git`."
)
raise ImportError(error_message)
def check_habana_frameworks_min_version(min_version):
"""
Checks if the installed version of `habana_frameworks` is larger than or equal to `min_version`.
"""
if get_habana_frameworks_version() < version.parse(min_version):
return False
else:
return True
def check_habana_frameworks_version(req_version):
"""
Checks if the installed version of `habana_frameworks` is equal to `req_version`.
"""
return (get_habana_frameworks_version().major == version.parse(req_version).major) and (
get_habana_frameworks_version().minor == version.parse(req_version).minor
)
def get_device_name():
"""
Returns the name of the current device: Gaudi, Gaudi2 or Gaudi3.
Inspired from: https://github.com/HabanaAI/Model-References/blob/a87c21f14f13b70ffc77617b9e80d1ec989a3442/PyTorch/computer_vision/classification/torchvision/utils.py#L274
"""
import habana_frameworks.torch.utils.experimental as htexp
device_type = htexp._get_device_type()
if device_type == htexp.synDeviceType.synDeviceGaudi:
return "gaudi"
elif device_type == htexp.synDeviceType.synDeviceGaudi2:
return "gaudi2"
elif device_type == htexp.synDeviceType.synDeviceGaudi3:
return "gaudi3"
else:
raise ValueError(f"Unsupported device: the device type is {device_type}.")
def get_device_count():
"""
Returns the number of the current gaudi devices
"""
import habana_frameworks.torch.utils.experimental as htexp
if htexp.hpu.is_available():
return htexp.hpu.device_count()
else:
raise ValueError("No hpu is found avail on this system")