perfkitbenchmarker/scripts/gcsfuse_scripts/read.py (79 lines of code) (raw):
"""Script that concurrently reads files to test the max throughput.
Example usages:
(1) Specify `--mountpoint` to read from gcsfuse.
> gsutil ls gs://gcsfuse-benchmark/10M/ | python read.py --mountpoint=/gcs/
(2) Omit `--mountpoint` to read from GCS using tf.io.gfile; specify
`--iterations` to run it multiple times.
> gsutil ls gs://gcsfuse-benchmark/10M/ | python read.py --iterations=3
"""
import concurrent.futures
import sys
import time
from typing import Tuple
from absl import flags
from absl import logging
import tensorflow as tf
FLAGS = flags.FLAGS
flags.DEFINE_integer(
"iterations", 1, "Number of iterations this benchmark should repeated run."
)
flags.DEFINE_integer(
"workers", 16, "Number of workers this benchmark runs concurrently."
)
flags.DEFINE_string(
"mountpoint",
None,
"The directory where all the GCS buckets are mounted. If "
"omitted, the benchmark reads the objects with tf.io.gfile "
"instead.",
)
flags.DEFINE_bool("verbose", False, "Print the results with extra information.")
class ObjectReader:
"""Provides a function to open and read an object as a file from GCS."""
def __init__(self, object_name, mountpoint):
self.object_name = object_name
self.mountpoint = mountpoint
def OpenFile(self):
"""Opens a Gfile or a file in the file system mounted by gcsfuse.
Returns:
An opened file or Gfile.
"""
if self.mountpoint:
file_name = self.object_name.replace("gs://", self.mountpoint)
return open(file_name, "rb")
else:
return tf.io.gfile.GFile(self.object_name, "rb")
def ReadFull(self):
"""Reads the entire file and returns the bytes read.
Returns:
The number of bytes read from the file.
"""
bytes_read = 0
with self.OpenFile() as f:
while True:
data = f.read(2 * 1024 * 1024)
if data:
bytes_read += len(data)
else:
break
return bytes_read
class ReadBenchmark:
"""Runs a benchmark by reading files and measure the throughput."""
def __init__(self):
self.iterations = FLAGS.iterations
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=FLAGS.workers
)
objects = sys.stdin.read().split("\n")
self.readers = [ObjectReader(o, FLAGS.mountpoint) for o in objects if o]
def Run(self) -> None:
"""Run the benchmark N times, printing all the metrics per iteration."""
for it in range(self.iterations):
total_mb, duration_sec = self.RunAllReaders()
self.PrintResult(it, total_mb, duration_sec)
self.executor.shutdown()
def RunAllReaders(self) -> Tuple[float, float]:
"""Read all files, returning bytes read and duration.
Returns:
A tuple including a total number of Megabyes read, and duration taken in
seconds.
"""
start = time.time()
size_list = list(self.executor.map(lambda r: r.ReadFull(), self.readers))
total_mb = sum(size_list) * 1.0 / (1024 * 1024)
duration_sec = time.time() - start
return total_mb, duration_sec
def PrintResult(
self, iteration: int, total_mb: float, duration_sec: float
) -> None:
"""Print the benchmark result.
Args:
iteration: An int N indicates this result comes from N-th iteration.
total_mb: Total amount of data being read in the iteration, in Megabytes.
duration_sec: The seconds it took to finish this iteration.
"""
throughput = total_mb / duration_sec
if FLAGS.verbose:
info = "#{}: {} MB, {:.1f} seconds, {:.1f} MB/s".format(
iteration, total_mb, duration_sec, throughput
)
print(info)
else:
print(throughput)
if __name__ == "__main__":
try:
FLAGS(sys.argv)
except flags.Error as e:
logging.exception("%s\nUsage: %s ARGS\n%s", e, sys.argv[0], FLAGS)
sys.exit(1)
ReadBenchmark().Run()