perfkitbenchmarker/linux_benchmarks/ai_model_throughput_benchmark.py (258 lines of code) (raw):
# Copyright 2024 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Benchmark to measure the throughput of a managed AI Model's inference."""
import logging
import os
import statistics
from typing import Any
from absl import flags
from perfkitbenchmarker import benchmark_spec as bm_spec
from perfkitbenchmarker import configs
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker.resources import managed_ai_model
from perfkitbenchmarker.scripts import throughput_load_driver
BENCHMARK_NAME = 'ai_model_throughput'
BENCHMARK_CONFIG = """
ai_model_throughput:
description: >
Records the throughput of a model.
ai_model:
model_name: 'llama2'
model_size: '7b'
cloud: 'GCP'
vm_groups:
default:
vm_spec: *default_dual_core
vm_count: 1
flags:
gcloud_scopes: cloud-platform
"""
_STARTING_REQUESTS = flags.DEFINE_integer(
'ai_starting_requests',
5,
'Number of requests to send in parallel at beginning of test.',
)
_MAX_PARALLEL_REQUESTS = flags.DEFINE_integer(
'ai_max_requests',
None,
'Max number of requests to send in parallel before ending the test. Set to'
' None or the same number as starting requests to effectively run a QPS'
' test at only that value.',
)
# File located at google3/third_party/py/perfkitbenchmarker/scripts/
THROUGHPUT_DRIVER_SCRIPT = 'throughput_load_driver.py'
_QUEUE_WAIT_TIME = 10 * 60
# Sagemaker times out requests if they take longer than 95 seconds.
_FAIL_LATENCY = 95
_SHARED_REQUEST = 'Why do crabs walk sideways?'
def GetConfig(user_config: dict[Any, Any]) -> dict[Any, Any]:
"""Load and return benchmark config.
Args:
user_config: user supplied configuration (flags and config file)
Returns:
loaded benchmark configuration
"""
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
def Prepare(benchmark_spec: bm_spec.BenchmarkSpec):
vm = _GetVm(benchmark_spec)
vm.Install('pip')
vm.RunCommand('pip install absl-py')
def _GetVm(
benchmark_spec: bm_spec.BenchmarkSpec,
) -> virtual_machine.BaseVirtualMachine:
assert benchmark_spec.vm_groups
assert benchmark_spec.vm_groups['default']
return benchmark_spec.vm_groups['default'][0]
def CheckPrerequisites(benchmark_config):
del benchmark_config
if (
_MAX_PARALLEL_REQUESTS.value
and _MAX_PARALLEL_REQUESTS.value < _STARTING_REQUESTS.value
):
raise errors.Config.InvalidValue(
'ai_max_requests must be None or >= ai_starting_requests. Got:'
f' {_MAX_PARALLEL_REQUESTS.value} as compared to'
f' {_STARTING_REQUESTS.value}'
)
def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]:
"""Runs the throughput benchmark.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample instances.
"""
logging.info('Running Run phase & finding throughput')
vm = _GetVm(benchmark_spec)
throughput_script = vm.PrepareResourcePath(THROUGHPUT_DRIVER_SCRIPT)
model = benchmark_spec.ai_model
assert model
# Label whether it's the first model or not.
endpoints = model.ListExistingEndpoints()
model.metadata.update({'First Model': len(endpoints) == 1})
# Confirm we can send one request.
model.SendPrompt(_SHARED_REQUEST, 512, 1.0)
return FindMaxThroughput(model, throughput_script, vm)
def FindMaxThroughput(
ai_model: managed_ai_model.BaseManagedAiModel,
throughput_script: str,
vm: virtual_machine.BaseVirtualMachine,
) -> list[sample.Sample]:
"""Finds the max throughput for the model."""
command = ai_model.GetPromptCommand(_SHARED_REQUEST, 512, 1.0)
logging.info(
'Finding max throughput & calling models with command: %s', command
)
step = 3
last_responses = []
burst_requests = _STARTING_REQUESTS.value
max_requests = _MAX_PARALLEL_REQUESTS.value or (_STARTING_REQUESTS.value + 1)
failed_responses = []
responses = []
expected_responses = 0
for burst_requests in range(_STARTING_REQUESTS.value, max_requests, step):
logging.info('Sending %s qps', burst_requests)
responses = _BurstRequestsOverTime(
command,
throughput_script,
vm,
burst_requests,
)
failed_responses = [
response
for response in responses
# Vertex AI responses always return a meaningless error, so can't check
# for error status.
if response.end_time - response.start_time > _FAIL_LATENCY
]
if failed_responses:
logging.info(
'Reached failure point when trying %s bursts with %s failures',
burst_requests,
len(failed_responses),
)
break
expected_responses = throughput_load_driver.GetExpectedNumberResponses(
burst_requests,
throughput_load_driver.TEST_DURATION.value,
throughput_load_driver.BURST_TIME.value,
)
if len(responses) < expected_responses:
logging.info(
'Expected %s responses but only got %s. This means the client could'
' not respond to all requests within the expected time. Failed at %s'
' bursts',
expected_responses,
len(responses),
burst_requests,
)
break
last_responses = responses
if not last_responses:
logging.warning(
'The very first QPS tried had errors. Probably a smaller staring'
' QPS needs to be chosen.',
)
return _AggregateResponses(
responses, failed_responses, ai_model, _STARTING_REQUESTS.value
)
last_successful_bursts = burst_requests
if failed_responses or len(responses) < expected_responses:
# We just failed, so output results from the last successful QPS.
last_successful_bursts = burst_requests - step
else:
logging.warning(
'Reached max burst value of %s without failures. Ending the test &'
' outputting results from the highest run QPS.',
last_successful_bursts,
)
samples = _AggregateResponses(
last_responses, failed_responses, ai_model, last_successful_bursts
)
assert samples
metadata = samples[0].metadata
samples.append(
sample.Sample(
'max_throughput',
last_successful_bursts / throughput_load_driver.BURST_TIME.value,
'count',
metadata,
)
)
return samples
def _BurstRequestsOverTime(
command: str,
throughput_script: str,
vm: virtual_machine.BaseVirtualMachine,
burst_requests: int,
) -> list[throughput_load_driver.CommandResponse]:
"""Calls VM to send burst_requests requests in parallel.
Also pulls from flag values from throughput_load_driver. Specifically, the
load test will send burst_requests requests in parallel for
TEST_DURATION seconds, waiting BURST_TIME seconds before sending another
burst.
Args:
command: The command to send to the model.
throughput_script: The path to the throughput script on the VM.
vm: The VM to run the script on.
burst_requests: The number of requests to send in parallel.
Returns:
A list of CommandResponse instances.
"""
try:
vm.RemoteCommand(
f'python3 {throughput_script}'
f' --_ai_throughput_command "{command}"'
f' --ai_test_duration {throughput_load_driver.TEST_DURATION.value}'
f' --ai_burst_time {throughput_load_driver.BURST_TIME.value}'
f' --_ai_throughput_parallel_requests {burst_requests}'
' --ai_throw_on_client_errors='
f'{throughput_load_driver.THROW_ON_CLIENT_ERRORS.value}',
timeout=throughput_load_driver.GetOverallTimeout(),
)
except errors.VmUtil.IssueCommandTimeoutError as timeout_error:
logging.info(
'Throughput load driver timed out with error: %s. This can be expected'
' if processes hung for too long.',
timeout_error,
)
shared_file_path = throughput_load_driver.GetOutputFilePath(burst_requests)
if os.path.exists(shared_file_path):
os.remove(shared_file_path)
logging.info(
'Pulling the result file %s to local machine for parsing.',
shared_file_path,
)
vm.PullFile(shared_file_path, shared_file_path)
return throughput_load_driver.ReadJsonResponses(burst_requests)
def _AggregateResponses(
responses: list[throughput_load_driver.CommandResponse],
failed_responses: list[throughput_load_driver.CommandResponse],
model: managed_ai_model.BaseManagedAiModel,
burst_requests: int,
) -> list[sample.Sample]:
"""Aggregates the responses into samples."""
successful_durations = [
response.end_time - response.start_time for response in responses
]
logging.info('Response durations dump: %s', successful_durations)
failed_durations = [
response.end_time - response.start_time for response in failed_responses
]
logging.info('Failed response durations dump: %s', failed_durations)
metadata = model.GetResourceMetadata()
effective_qps = burst_requests / throughput_load_driver.BURST_TIME.value
metadata.update({
'parallel_requests': burst_requests,
'test_duration': throughput_load_driver.TEST_DURATION.value,
'burst_time': throughput_load_driver.BURST_TIME.value,
'effective_qps': effective_qps,
})
samples = []
if failed_durations:
samples.append(
sample.Sample(
'failure_median_response_time',
statistics.median(failed_durations),
'seconds',
metadata,
)
)
samples.append(
sample.Sample(
'num_failures',
len(failed_durations),
'count',
metadata,
)
)
if not successful_durations:
return samples
samples.append(
sample.Sample(
'success_rate',
len(successful_durations)
/ (len(successful_durations) + len(failed_durations))
* 100.0,
'percent',
metadata,
)
)
samples.append(
sample.Sample(
'num_responses',
len(responses),
'count',
metadata,
)
)
samples.append(
sample.Sample(
'median_response_time',
statistics.median(successful_durations),
'seconds',
metadata,
)
)
samples.append(
sample.Sample(
'mean_response_time',
statistics.mean(successful_durations),
'seconds',
metadata,
)
)
return samples
def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec):
"""Cleanup resources to their original state.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
logging.info('Running Cleanup phase of the benchmark')
del benchmark_spec