cost-optimization/hpa-config-recommender/src/hpaconfigrecommender/plan_workload_simulation.py (422 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" HPA Simulation Plan """
import logging
from typing import List, Tuple, Dict, Optional
import pandas as pd
import numpy as np
from .utils.config import Config
from .utils.models import (
WorkloadDetails,
WorkloadPlan,
)
from .utils.log import (
log_exec_time
)
# Configure logger
logger = logging.getLogger(__name__)
# Configure pandas to copy on writing in a view
pd.options.mode.copy_on_write = True
def _get_proposed_memory_recommendation(
config: Config,
workload_df:pd.DataFrame,
proposed_min_replicas: int,
) -> float:
"""
Calculate the recommended memory request per replica.
This function computes the memory request per replica by dividing the
total memory usage by the proposed minimum replicas and applying an
extra buffer for memory recommendation.
Args:
config (Config): Configuration settings for mem recommendations.
workload_df (pd.DataFrame): DataFrame containing workload metrics,
including memory usage.
proposed_min_replicas (int): Proposed minimum number of replicas.
Returns:
float: Recommended memory request per replica in MiB.
"""
total_memory_capacity= (
workload_df["sum_containers_mem_usage_mi"].max()
)
proposed_min_replicas = max(
proposed_min_replicas, config.MIN_REC_REPLICAS
)
proposed_mem_recommendation = (
(total_memory_capacity/proposed_min_replicas)
)
return np.ceil(
min(
proposed_mem_recommendation,
workload_df["avg_container_mem_usage_mi"].mean()
)*
max(config.EXTRA_HPA_BUFFER_FOR_MEMORY_RECOMMENDATION,1))
def _get_unique_combinations(
workload_plans: List[WorkloadPlan]
) -> List[WorkloadPlan]:
"""
Filters a list of WorkloadPlan objects to return unique entries
based on recommended CPU request, memory request, and replica counts.
The list is sorted by method, then recommended_cpu_request,
recommended_mem_request_and_limits_mi, and max_replicas.
Args:
workload_plans (List[WorkloadPlan]):
List of recommendations from DMR (Dynamic Memory Request).
Returns:
List[WorkloadPlan]: Combined list of recommendations,
where DMR recommendations that duplicate the DCR combinations
(same CPU request and min replicas) are excluded.
"""
seen = set()
unique_plans = []
for plan in workload_plans:
key = (
plan.recommended_cpu_request,
plan.recommended_mem_request_and_limits_mi,
plan.recommended_min_replicas,
plan.recommended_max_replicas
)
if key not in seen:
seen.add(key)
unique_plans.append(plan)
return sorted(
unique_plans,
key=lambda x: (
x.method,
x.recommended_cpu_request,
x.recommended_mem_request_and_limits_mi,
x.recommended_max_replicas
)
)
def _is_workload_balanced(workload_df: pd.DataFrame) -> bool:
"""
Determines if the workload is balanced based on the ratio of CPU
usage standard deviation to average usage.
Args:
workload_df (pd.DataFrame): DataFrame containing the
workload metrics with "stddev_containers_cpu_usage"
and "avg_container_cpu_usage".
Returns:
A boolean indicating if the workload
is balanced (True), unbalanced (False), or None for invalid
data.
"""
if workload_df.empty:
logger.warning("Workload DataFrame is empty,")
return None
try:
avg_cpu_usage = workload_df["avg_container_cpu_usage"].mean()
stddev_cpu_usage = workload_df["stddev_containers_cpu_usage"].mean()
if avg_cpu_usage == 0:
logger.info("Division by zero in workload balancing calculation.")
return None
ratio = (2 * stddev_cpu_usage) / avg_cpu_usage
is_balanced = ratio < 0.25
logger.info(
"Workload is balanced: %s (ratio = %.3f)", is_balanced, ratio
)
return is_balanced
except KeyError as e:
logger.error("KeyError: Missing required column %s", {e})
return True
def _is_cpu_under_provisioned(config: Config,
workload_df: pd.DataFrame) -> bool:
"""
Determines if the CPU is under-provisioned by comparing the
maximum CPU request with the 90th percentile of CPU usage.
Args:
df (pd.DataFrame): DataFrame containing the columns.
config (Config): Run configurations.
Returns:
bool: True if CPU is under-provisioned, otherwise False.
Returns None if input data is invalid.
"""
underprovisioned_cpu_usage_threshold = (
config.UNDERPROVISIONED_CPU_USAGE_THRESHOLD
)
logger.info("Upder provsioned cpu thereshold: %.2f",
underprovisioned_cpu_usage_threshold)
# Compute and compare max CPU request and 90th percentile CPU usage
max_cpu_request = (
workload_df.get("avg_container_cpu_request", pd.Series(0)).max()
)
cpu_usage_percentile = (
workload_df["avg_container_cpu_usage"].quantile(
underprovisioned_cpu_usage_threshold
)
)
return max_cpu_request < cpu_usage_percentile
def _calculate_recommended_max_cpu_capacity(
config: Config,workload_df: pd.DataFrame) -> int:
"""
Calculate the recommended maximum HPA capacity based on container
CPU usage and requests.
Args:
df (pd.DataFrame): DataFrame containing 'sum_containers_cpu_request'
and 'sum_containers_cpu_usage' columns.
config (Config): Run configurations.
Returns:
float: Recommended HPA max capacity for the workload.
"""
if _is_cpu_under_provisioned(config,workload_df):
sum_original_cpu_capacity = (
workload_df["sum_containers_cpu_usage"].max() *
config.EXTRA_HPA_BUFFER_FOR_CPU_USAGE_CAPACITY
)
logger.info("The CPU is under-provisioned.")
else:
sum_original_cpu_capacity = (
workload_df["sum_containers_cpu_request"].max()
)
logger.info("The CPU is not under-provisioned.")
logger.info("Max CPU capacity %.3f", sum_original_cpu_capacity)
return (sum_original_cpu_capacity *
config.EXTRA_HPA_BUFFER_FOR_MAX_REPLICAS)
def _vpa_recommendation(
config: Config,
workload_df: pd.DataFrame) -> WorkloadPlan:
"""
Static workload recommendation
Args:
config (Config): _description_
workload_df (pd.DataFrame): _description_
Returns:
WorkloadPlan
"""
num_of_replicas = max(
workload_df["num_replicas_at_usage_window"].min(),
config.MIN_REC_REPLICAS
)
vpa_plan = WorkloadPlan(
method= "VPA",
recommended_cpu_request = round(
(
workload_df["sum_containers_cpu_usage"].quantile(0.98)/num_of_replicas
) * config.EXTRA_HPA_BUFFER_FOR_CPU_USAGE_CAPACITY
,3),
recommended_cpu_limit_or_unbounded = np.ceil(
(
workload_df['sum_containers_cpu_usage'].max()/ num_of_replicas
) * config.EXTRA_HPA_BUFFER_FOR_CPU_USAGE_CAPACITY
),
recommended_mem_request_and_limits_mi = np.ceil(
(
workload_df["sum_containers_mem_usage_mi"].max()/num_of_replicas
) * config.EXTRA_VPA_BUFFER_FOR_MEMORY_RECOMMENDATION
),
recommended_min_replicas = num_of_replicas,
recommended_max_replicas = num_of_replicas,
recommended_hpa_target_cpu = 1.0,
workload_e2e_startup_latency_rows=1
)
return vpa_plan
def _dynamic_cpu_request(
config: Config,
max_cpu_capacity: float,
workload_df: pd.DataFrame
) -> List[WorkloadPlan]:
"""
Generate dynamic CPU request values based on percentiles.
This function calculates possible configurations for CPU request values
using percentile-based metrics from workload data. It ensures that only
unique CPU requests are considered, avoiding duplicates.
Args:
config (Config): Configuration settings for HPA recommendations.
max_cpu_capacity (float): Maximum CPU capacity observed in the workload.
workload_df (pd.DataFrame): DataFrame containing workload CPU and memory
metrics.
Returns:
List[WorkloadPlan]: List of recommended HPA configurations based on
CPU usage.
"""
# Calculate minimum replicas and memory request
min_replicas = max(
get_min_replicas(workload_df, config),
config.MIN_REC_REPLICAS
)
proposed_mem_request_mi = _get_proposed_memory_recommendation(
config, workload_df, min_replicas
)
# Generate percentiles using numpy
percentiles = np.arange(
config.MIN_DCR_PERCENTILE_VALUE,
config.MAX_DCR_PERCENTILE_VALUE + 1
)
# Calculate CPU requests for all percentiles at once
quantiles = np.percentile(
workload_df["avg_container_cpu_usage"], percentiles
)
# Round and enforce minimum CPU core value constraint
cpu_request_percentiles = [
(p, max(round(q, config.MCPU_ROUNDING),
config.MIN_CPU_CORE_PROPOSED_VALUE))
for p, q in zip(percentiles, quantiles)
]
proposed_cpu_requests = []
seen_combinations = set()
# Iterate through CPU requests
for p, cpu_request in cpu_request_percentiles:
max_replicas = int(np.ceil(max_cpu_capacity / cpu_request))
combination = (cpu_request, min_replicas, max_replicas)
if combination not in seen_combinations:
seen_combinations.add(combination)
proposed_cpu_requests.append(
WorkloadPlan(
recommended_cpu_request=cpu_request,
recommended_mem_request_and_limits_mi=(
proposed_mem_request_mi),
recommended_min_replicas=min_replicas,
recommended_max_replicas=max_replicas,
method=f"DCR-{p}",
)
)
logger.info(
"Generated %d Dynamic CPU Request (DCR) options.",
len(proposed_cpu_requests),
)
return proposed_cpu_requests
def _dynamic_min_replicas(
config: Config,
max_cpu_capacity: float,
workload_df: pd.DataFrame
) -> List[WorkloadPlan]:
"""
Generate dynamic minimum replicas (DMR) options based on workload metrics.
This function calculates possible configurations for the minimum number of
replicas in an HPA setup by evaluating CPU and memory requirements.
Args:
config (Config): Configuration settings for HPA recommendations.
max_cpu_capacity (float): Maximum CPU capacity observed in the workload.
workload_df (pd.DataFrame): DataFrame containing workload CPU and memory
metrics.
Returns:
List[WorkloadPlan]: List of recommended HPA configurations.
"""
min_replicas_options = []
seen_combinations = set()
# Determine if workload is balanced and compute the proposed CPU request
scaling_method = "mean"
proposed_cpu_request = round(
workload_df["avg_container_cpu_usage"].mean(),
config.MCPU_ROUNDING
)
if proposed_cpu_request == 0:
logger.warning(
"Proposed CPU request is 0. No replicas can be recommended.")
return []
# Initialize loop variables
min_replicas = config.MIN_REC_REPLICAS
max_replicas = int(np.ceil(max_cpu_capacity / proposed_cpu_request))
# Iterate using a while loop to handle dynamic recalculations
while min_replicas < max_replicas:
# Calculate CPU request per replica
cpu_request = max(
round(proposed_cpu_request, 3),
config.MIN_CPU_CORE_PROPOSED_VALUE
)
if (min_replicas * cpu_request) > (
workload_df["sum_containers_cpu_usage"].max()):
break
# Recalculate max replicas based on updated CPU request
max_replicas = int(np.ceil(max_cpu_capacity / cpu_request))
# Calculate memory request per replica
proposed_mem_request_mi = _get_proposed_memory_recommendation(
config, workload_df, max_replicas
)
# Avoid duplicate combinations
combination = (cpu_request, min_replicas, max_replicas)
if combination not in seen_combinations:
seen_combinations.add(combination)
min_replicas_options.append(
WorkloadPlan(
recommended_cpu_request=cpu_request,
recommended_mem_request_and_limits_mi=np.ceil(
proposed_mem_request_mi),
recommended_min_replicas=min_replicas,
recommended_max_replicas=max_replicas,
method=f"DMR_{scaling_method}-loop_{min_replicas}",
)
)
# Increment min_replicas
min_replicas += 1
logger.info(
"Generated %d Dynamic Minimum Replicas (DMR-%s) "
"options based on CPU and memory metrics.",
len(min_replicas_options),
scaling_method
)
return min_replicas_options
def _calculate_max_usage_slope_up_ratio(
workload_df: pd.DataFrame,
workload_e2e_startup_latency_rows: int
) -> pd.DataFrame:
"""
Updates the input DataFrame in place by calculating the
'max_usage_slope_up_ratio' column while preserving the intermediate
columns for CPU and memory usage during startup latency.
Args:
workload_df (pd.DataFrame): DataFrame containing workload metrics
with CPU and memory usage data.
workload_e2e_startup_latency_rows (int): Number of rows representing
the startup latency window.
Returns:
pd.DataFrame: The input DataFrame updated with the following columns:
- 'max_cpu_usage_in_workload_e2e_startup_latency'
- 'max_mem_usage_mi_in_workload_e2e_startup_latency'
- 'max_usage_slope_up_ratio'
"""
if workload_e2e_startup_latency_rows <= 0:
raise ValueError(
"workload_e2e_startup_latency_rows must be greater than 0."
)
# Create a rolling window indexer
forward_looking = pd.api.indexers.FixedForwardWindowIndexer(
window_size=workload_e2e_startup_latency_rows
)
# Compute rolling max for CPU and memory usage, and keep the columns
workload_df["max_cpu_usage_in_workload_e2e_startup_latency"] = (
workload_df["avg_container_cpu_usage"]
.rolling(window=forward_looking)
.max()
)
workload_df["max_mem_usage_mi_in_workload_e2_startup_latency"] = (
workload_df["max_containers_mem_usage_mi"]
.rolling(window=forward_looking)
.max()
)
# Compute CPU and memory ratios safely (avoiding division by zero)
cpu_ratio = (
workload_df["max_cpu_usage_in_workload_e2e_startup_latency"]
/ workload_df["avg_container_cpu_usage"].replace(0, np.nan)
)
mem_ratio = (
workload_df["max_mem_usage_mi_in_workload_e2_startup_latency"]
/ workload_df["max_containers_mem_usage_mi"].replace(0, np.nan)
)
# Compute the max usage slope-up ratio as the element-wise maximum
workload_df["max_usage_slope_up_ratio"] = np.maximum(
cpu_ratio.fillna(0), mem_ratio.fillna(0)
)
return workload_df
def _get_recommended_configs(
config: Config,
plan: WorkloadPlan,
workload_df: pd.DataFrame
) -> Tuple[Optional[WorkloadPlan], Optional[str]]:
"""
Calculates the recommended HPA configurations based on the workload
analysis and usage slopes.
Args:
workload_df (pd.DataFrame): The DataFrame containing the workload
data.
plan (WorkloadPlan): The HPA plan with initial
recommendations.
config (Config): Run configurations.
Returns:
Tuple[WorkloadPlan,str]: The updated HPA plan
with recommended target CPU and limits, and a reason if the plan
is skipped or invalid.
"""
reason = {}
# Filter only the points above what is recommended as baseline requests
plan_request_baseline = plan.recommended_cpu_request
filtered_df = workload_df[
workload_df["avg_container_cpu_usage"] >= plan_request_baseline
]
if filtered_df.empty:
reason = (
f"Skip HPA Plan {plan.method}. "
f"No usage above CPU baseline requests:{plan_request_baseline:.2f}."
)
logger.info(reason)
return None, reason
# Check if slopes are too big
max_usage_slope_up_ratio = round(
filtered_df["max_usage_slope_up_ratio"].max(), 2
)
if max_usage_slope_up_ratio > config.HPA_SCALE_LIMIT:
reason = (
f"Skip HPA Plan {plan.method}. Slope ratio "
f"{max_usage_slope_up_ratio} exceeds HPA scale limit "
f"{config.HPA_SCALE_LIMIT}."
)
logger.info(reason)
return None, reason
plan.max_usage_slope_up_ratio = max_usage_slope_up_ratio
plan.recommended_hpa_target_cpu = round(
(
(1 - config.HPA_TARGET_BUFFER) /
filtered_df["max_usage_slope_up_ratio"]
).min(), 2
)
min_hpa_target_cpu = config.MIN_HPA_TARGET_CPU
max_hpa_target_cpu = config.MAX_HPA_TARGET_CPU
if plan.recommended_hpa_target_cpu < min_hpa_target_cpu or \
plan.recommended_hpa_target_cpu > max_hpa_target_cpu:
reason = (
f"Skip HPA Plan {plan.method}. Recommended Target CPU "
f"{plan.recommended_hpa_target_cpu} not between "
f"{min_hpa_target_cpu} and {max_hpa_target_cpu}."
)
logger.info(reason)
return None, reason
plan.recommended_cpu_limit_or_unbounded = np.ceil(
plan.recommended_cpu_request + (
filtered_df[
"max_cpu_usage_in_workload_e2e_startup_latency"
].max() / plan.recommended_max_replicas
)
)
return plan, None
def convert_data_types(workload_df: pd.DataFrame) -> pd.DataFrame:
"""
Converts data types of specific columns in a workload DataFrame for
memory efficiency and optimized performance.
- Converts 'window_begin' to datetime64 format.
- Converts 'num_replicas_at_usage_window' to Int16 (nullable).
- Converts CPU-related columns to float16 for reduced memory usage.
- Converts memory-related columns to Int64 (nullable) for larger
integer capacity.
Parameters:
workload_df (pd.DataFrame): The DataFrame containing workload data
with specific columns for conversion.
Returns:
pd.DataFrame: DataFrame with columns converted to optimized data types.
Raises:
KeyError: If any required column is missing from the input DataFrame.
"""
# Convert to datetime[s] and remove timezone awareness
workload_df["window_begin"] = pd.to_datetime(
workload_df["window_begin"], errors="coerce"
).dt.tz_localize(None).astype("datetime64[s]")
# Convert `num_replicas_at_usage_window` to Int16, allowing for NaN
workload_df["num_replicas_at_usage_window"] = (
pd.to_numeric(
workload_df["num_replicas_at_usage_window"], errors="coerce")
.astype("Int16")
)
# Convert metrics to appropriate numeric types
float16_columns = [
"avg_container_cpu_usage",
"stddev_containers_cpu_usage",
"sum_containers_cpu_request",
"sum_containers_cpu_usage"
]
float32_columns = [
"sum_containers_mem_request_mi",
"sum_containers_mem_usage_mi"
]
for col in float16_columns:
workload_df[col] = (
pd.to_numeric(workload_df[col], errors="coerce").astype("float16")
)
for col in float32_columns:
workload_df[col] = (
pd.to_numeric(workload_df[col], errors="coerce").astype("float32")
)
return workload_df
def get_min_replicas(workload_df: pd.DataFrame, config: Config)-> int:
"""
Due to node autoscaling, workloads can be evicted and, during a small
portion of the time, they can get to a smaller than desired number of
replicas if they don´t have PDB blocking this behaviour.
Because of such a situation, we return the number of replicas at 10th
percentile. The 0.1 was arbritary and may be revisited in the future.
"""
df = workload_df[workload_df["num_replicas_at_usage_window"] > 0]
if df.empty:
return config.MIN_REC_REPLICAS
min_replicas_at_10p = df["num_replicas_at_usage_window"].quantile(0.1)
return int(min_replicas_at_10p)
@log_exec_time(logger)
def get_simulation_plans(
workload_details: WorkloadDetails,
workload_df: pd.DataFrame
) -> Tuple[List[WorkloadPlan], Dict[str,str]]:
"""
Returns a list of all recommendations from the DMR and DCR Algorithms.
Args:
workload_df (pd.DataFrame): DataFrame with workload metrics.
workload_details: Workload details.
Returns:
List[WorkloadPlan]: List of resource recommendations.
during the programs's run.
"""
logger.info("Starting HPA simulation plan %s.", workload_details)
reasons = {}
if workload_df.empty:
logger.warning(
"The workload dataframe is empty, exiting simulation plan."
)
reasons["general"] = "Workload dataframe is empty."
return [], reasons
workload_df = convert_data_types(workload_df)
max_cpu_capacity = _calculate_recommended_max_cpu_capacity(
workload_details.config, workload_df)
if max_cpu_capacity == 0:
logger.warning("CPU Max Capacity is 0, exiting simulation plan.")
reasons["general"] = "CPU Max Capacity is 0."
return [], reasons
dcr = _dynamic_cpu_request(
workload_details.config,
max_cpu_capacity,
workload_df
)
dmr = _dynamic_min_replicas(
workload_details.config,
max_cpu_capacity,
workload_df
)
combinations = dcr + dmr
proposed_hpa_resources = _get_unique_combinations(combinations)
if not proposed_hpa_resources:
logger.info(
"No valid recommendations generated: for %s", workload_details
)
reasons["general"] = "No valid recommendations generated."
return [], reasons
workload_e2e_startup_latency_rows = (
workload_details.workload_e2e_startup_latency_rows
)
workload_df = _calculate_max_usage_slope_up_ratio(
workload_df, workload_e2e_startup_latency_rows
)
plans = []
for plan in proposed_hpa_resources:
plan.workload_e2e_startup_latency_rows = (
workload_e2e_startup_latency_rows
)
config_vals, reason = _get_recommended_configs(
workload_details.config,
plan,
workload_df
)
if config_vals is None:
reasons[plan.method] = reason
continue
plans.append(plan)
plans.append(_vpa_recommendation(workload_details.config, workload_df))
logger.info(
"HPA simulation plan completed successfully with %d plans for %s.",
len(plans),
workload_details
)
return plans, reasons