# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Extracts required metrics from fio output file and writes to google sheet.

   Takes fio output json filepath as command-line input
   Extracts IOPS, Bandwidth and Latency (min, max, mean) from given input file
   and writes the metrics in appropriate columns in a google sheet

   Usage from perfmetrics/scripts folder:
    python3 -m fio.fio_metrics <path to fio output json file>

"""

from dataclasses import dataclass
import json
import re
import sys
from typing import Any, Dict, List, Tuple, Callable

from fio import constants as consts
from gsheet import gsheet

from bigquery import constants
from bigquery import experiments_gcsfuse_bq

@dataclass(frozen=True)
class JobParam:
  """Dataclass for a FIO job parameter.

  name: Can be any suitable value, it refers to the output dictionary key for
  the parameter. To be used when creating parameter dict for each job.
  json_name: Must match the FIO job specification key. Key for parameter inside
  'global options'/'job options' dictionary
    Ex: For output json = {"global options": {"filesize":"50M"}, "jobs": [
    "job options": {"rw": "read"}]}
    `json_name` for file size will be "filesize" and that for readwrite will be
    "rw"
  format_param: Function returning formatted parameter value. Needed to convert
  parameter to plottable values
    Ex: 'filesize' is obtained as '50M', but we need to convert it to integer
    showing size in kb in order to maintain uniformity
  default: Default value for the parameter

  """
  name: str
  json_name: str
  format_param: Callable[[str], Any]
  default: Any


@dataclass(frozen=True)
class JobMetric:
  """Dataclass for a FIO job metric.

  name: Can be any suitable value, it is used as key for the metric
  when creating metric dict for each job
  levels: Keys for the metric inside 'read'/'write' dictionary in each job.
  Each value in the list must match the key in the FIO output JSON
    Ex: For job = {'read': {'iops': 123, 'latency': {'min': 0}}}
    levels for IOPS will be ['iops'] and for min latency-> ['latency', 'min']
  conversion: Multiplication factor to convert the metric to the desired unit
    Ex: Extracted latency metrics are in nanoseconds, but we need them in
    seconds for plotting. Hence conversion=10^(-9) for latency metrics.
  """
  name: str
  levels: List[str]
  conversion: float


REQ_JOB_PARAMS = []
# DO NOT remove the below append line
REQ_JOB_PARAMS.append(JobParam(consts.RW, consts.RW, lambda val: val, 'read'))

REQ_JOB_PARAMS.append(JobParam(consts.THREADS, consts.NUMJOBS,
                               lambda val: int(val), 1))
REQ_JOB_PARAMS.append(
    JobParam(
        consts.FILESIZE_KB, consts.FILESIZE,
        lambda val: _convert_value(val, consts.FILESIZE_TO_KB_CONVERSION), 0))
# append new params here

REQ_JOB_METRICS = []
REQ_JOB_METRICS.append(JobMetric(consts.IOPS, [consts.IOPS], 1))
REQ_JOB_METRICS.append(JobMetric(consts.BW_BYTES, [consts.BW_BYTES], 1))
REQ_JOB_METRICS.append(JobMetric(consts.IO_BYTES, [consts.IO_BYTES], 1))
REQ_JOB_METRICS.append(JobMetric('lat_s_min',
                                 [consts.LAT_NS, consts.MIN], consts.NS_TO_S))
REQ_JOB_METRICS.append(JobMetric('lat_s_max',
                                 [consts.LAT_NS, consts.MAX], consts.NS_TO_S))
REQ_JOB_METRICS.append(JobMetric('lat_s_mean',
                                 [consts.LAT_NS, consts.MEAN], consts.NS_TO_S))
REQ_JOB_METRICS.extend([
    JobMetric('lat_s_perc_20',
              [consts.LAT_NS, consts.PERCENTILE, consts.P20], consts.NS_TO_S),
    JobMetric('lat_s_perc_50',
              [consts.LAT_NS, consts.PERCENTILE, consts.P50], consts.NS_TO_S),
    JobMetric('lat_s_perc_90',
              [consts.LAT_NS, consts.PERCENTILE, consts.P90], consts.NS_TO_S),
    JobMetric('lat_s_perc_95',
              [consts.LAT_NS, consts.PERCENTILE, consts.P95], consts.NS_TO_S)])
# append new metrics here


def _convert_value(value, conversion_dict, default_unit=''):
  """Converts data strings to a particular unit based on conversion_dict.

  Args:
    value: String, contains data value[+unit]
    conversion_dict: Dictionary containing units and their respective
      multiplication factor
    default_unit: String, specifies the default unit, used if no unit is present
      in 'value'. Ex: In the job file, we can set ramp_time as "10s" or "10".
      For the latter, the default unit (seconds) is considered.

  Returns:
    Int, number in a specific unit

  Raises:
    KeyError: If empty string is passed as value or if unit is present as key in
    conversion_dict
    ValueError: If string has no numerical part

  Ex: For args value = "5s" and conversion_dict=consts.TIME_TO_MS_CONVERSION
      "5s" will be converted to 5000 milliseconds and 5000 will be returned

  """
  num_unit = re.findall('[0-9]+|[A-Za-z]+', value)
  if len(num_unit) == 2:
    unit = num_unit[1]
  else:
    unit = default_unit
  num = num_unit[0]
  mult_factor = conversion_dict[unit.lower()]
  converted_num = int(num) * mult_factor
  return converted_num


def _get_rw(rw_value):
  """Converting read/randread/write/randwrite to just read/write.

  Args:
    rw_value: str, possible values: read/randread/write/randwrite

  Returns:
    str, read/write

  Raises:
    ValueError: If any rw_value other than read/randread/write/randwrite

  """
  if rw_value in ['read', 'randread']:
    return consts.READ
  if rw_value in ['write', 'randwrite']:
    return consts.WRITE
  raise ValueError('Only read/randread/write/randwrite are supported')


class NoValuesError(Exception):
  """Some data is missing from the json output file."""


class FioMetrics:
  """Handles logic related to parsing fio output and writing them to google sheet.

  """

  def _load_file_dict(self, filepath) -> Dict[str, Any]:
    """Reads json data from given filepath and returns json object.

    Args:
      filepath : str
        Path of the json file to be parsed

    Returns:
      JSON object, contains json data loaded from given filepath

    Raises:
      OSError: If input filepath doesn't exist
      ValueError: file is not in proper JSON format
      NoValuesError: file doesn't contain JSON data

    """
    fio_out = {}
    f = open(filepath, 'r')
    try:
      fio_out = json.load(f)
    except ValueError as e:
      raise e
    finally:
      f.close()

    if not fio_out:  # Empty JSON object
      raise NoValuesError(f'JSON file {filepath} returned empty object')
    return fio_out

  def _get_global_ramp_time(self, out_json):
    global_ramptime_ms = 0
    if consts.GLOBAL_OPTS in out_json:
      if consts.RAMPTIME in out_json[consts.GLOBAL_OPTS]:
        global_ramptime_ms = _convert_value(
            out_json[consts.GLOBAL_OPTS][consts.RAMPTIME],
            consts.TIME_TO_MS_CONVERSION, 's')
    return global_ramptime_ms

  def _get_job_ramp_time(self, job):
    ramptime_ms = 0
    if consts.JOB_OPTS in job:
      if consts.RAMPTIME in job[consts.JOB_OPTS]:
        ramptime_ms = _convert_value(job[consts.JOB_OPTS][consts.RAMPTIME],
                                     consts.TIME_TO_MS_CONVERSION, 's')
    return ramptime_ms

  def _get_start_end_times(self, out_json, job_params) -> List[Tuple[int]]:
    """Returns start and end times of each job as a list.

    Args:
      out_json : FIO json output
      job_params: List of dicts, each dict containing parameters of a job

    Returns:
      List of start and end time tuples, one tuple for each job
      Ex: [(1653027014, 1653027084), (1653027084, 1653027155)]

    Raises:
      KeyError: If RW is not present in any dict in job_params

    """
    # Creating a list of just the 'rw' job parameter.
    rw_list = [job_param[consts.RW] for job_param in job_params]

    global_ramptime_ms = self._get_global_ramp_time(out_json)
    start_end_times = []
    for i, job in enumerate(list(out_json[consts.JOBS])):
      rw = rw_list[i]
      job_rw = job[_get_rw(rw)]
      ramptime_ms = self._get_job_ramp_time(job)

      if ramptime_ms == 0:
        ramptime_ms = global_ramptime_ms

      start_time_ms = job[consts.JOB_START]
      end_time_ms = start_time_ms + job_rw[consts.RUNTIME] + ramptime_ms

      # converting start and end time to seconds
      start_time_s = start_time_ms // 1000
      end_time_s = round(end_time_ms / 1000)
      start_end_times.append((start_time_s, end_time_s))

    return list(start_end_times)

  def _get_job_params(self, out_json):
    """Returns parameter values of each job.

    We'll extract job parameter from 'global options' or 'job options' in the
    JSON using key specified by `json_name`. The parameter will be formatted
    according to function in `format_param`. This formatted value will be stored
    against `name` key. If no parameter is found in the JSON object, the
    `default` value will be used.

    Args:
      out_json : FIO json output

    Returns:
      List of dicts, each dict containing parameters for a job
        Ex: [{'filesize_kb': 50000, 'num_threads': 40, 'rw': 'read'}

    Function working example:
      Ex: out_json = {"global options": {"filesize": "50M", "numjobs": "40"},
                      "jobs":[{"job options": {"numjobs": "10"}}]
                      }
      For REQ_JOB_PARAMS = [
          JobParam(
              name= RW,
              json_name= RW,
              format_param=lambda val: val,
              default = 'read'
          ),
          JobParam(
              name= THREADS,
              json_name= NUMJOBS,
              format_param=lambda val: int(val),
              default = 1
          ),
          JobParam(
              name= FILESIZE_KB,
              json_name= FILESIZE,
              format_param=lambda val: _convert_value(val,
              consts.FILESIZE_TO_KB_CONVERSION),
              default = 0
          )
      ]
      Extracted parameters would be [{RW:'read', THREADS: 10, FILESIZE_KB:
      50000}]


    """
    # Job parameters specified as Global options
    # Each param is formatted according to its format function before storing
    global_params = {}
    if consts.GLOBAL_OPTS in out_json:
      for param in REQ_JOB_PARAMS:
        # If param not present in global options, default value is used
        if param.json_name in out_json[consts.GLOBAL_OPTS]:
          global_params[param.name] = param.format_param(
              out_json[consts.GLOBAL_OPTS][param.json_name])
        else:
          global_params[param.name] = param.default

    # Job parameters specified as job options overwrite global options
    params = []
    for job in out_json[consts.JOBS]:
      curr_job_params = {}
      if consts.JOB_OPTS in job:
        for param in REQ_JOB_PARAMS:
          # If the param is not present in job options, global param is used
          if param.json_name in job[consts.JOB_OPTS]:
            curr_job_params[param.name] = param.format_param(
                job[consts.JOB_OPTS][param.json_name])
          else:
            curr_job_params[param.name] = global_params[param.name]

      params.append(curr_job_params)

    return params

  def _extract_metrics(self, fio_out) -> List[Dict[str, Any]]:
    """Extracts and returns required metrics from fio output dict.

      The extracted metrics are stored in a list. Each entry in the list is a
      dictionary. Each dictionary stores the following fio metrics related
      to a particualar job:
        filesize, number of threads, IOPS, Bandwidth and latency (min,
        max and mean)

    Args:
      fio_out: JSON object representing the fio output

    Returns:
      List of dicts, contains list of jobs and required parameters and metrics
      for each job
      Example return value:
        [{'params': {'filesize': 50000, 'num_threads': 40, 'rw': 'read'},
          'start_time': 1653027084, 'end_time': 1653027155, 'metrics':
        {'iops': 95.26093, 'bw_bytes': 99888324, 'io_bytes': 6040846336,
        'lat_s_mean': 0.41775487677469203, 'lat_s_min': 0.35337776000000004,
        'lat_s_max': 1.6975198690000002, 'lat_s_perc_20': 0.37958451200000004,
        'lat_s_perc_50': 0.38797312, 'lat_s_perc_90': 0.49283072000000006,
        'lat_s_perc_95': 0.526385152}}]

    Raises:
      NoValuesError: Data not present in json object or key in LEVELS is not
      present in FIO output

    """

    if not fio_out:
      raise NoValuesError('No data in json object')

    job_params = self._get_job_params(fio_out)
    start_end_times = self._get_start_end_times(fio_out, job_params)
    all_jobs = []
    # Get the required metrics for every job
    for i, job in enumerate(fio_out[consts.JOBS]):
      rw = job_params[i][consts.RW]
      job_rw = job[_get_rw(rw)]
      job_metrics = {}
      for metric in REQ_JOB_METRICS:
        val = job_rw
        """
        For metric.levels=['lat_ns', 'percentile', '20.000000']
        After 1st iteration, sub = 'lat_ns', val = job_rw['lat_ns']
        After 2nd iteration, sub = 'percentile', val =
        job_rw['lat_ns']['percentile']
        After 3rd iteration, sub = '20.000000', val =
        job_rw['lat_ns']['percentile']['20.000000'] and hence we get the
        required metric value
        """
        for sub in metric.levels:
          if sub in val:
            val = val[sub]
          else:
            val = 0
            raise NoValuesError(
                f'Required metric {sub} not present in json output')

        job_metrics[metric.name] = val * metric.conversion

      start_time_s, end_time_s = start_end_times[i]

      # start_time>=end_time OR all the metrics are zero,
      # log skip warning and continue to next job
      if ((start_time_s >= end_time_s) or
          (all(not value for value in job_metrics.values()))):
        # TODO(ahanadatta): Print statement will be replaced by logging.
        print(f'No job metrics in json, skipping job index {i}')
        continue

      all_jobs.append({
          consts.PARAMS: job_params[i],
          consts.START_TIME: start_time_s,
          consts.END_TIME: end_time_s,
          consts.METRICS: job_metrics
      })

    if not all_jobs:
      raise NoValuesError('No data could be extracted from file')

    return all_jobs

  def get_values_to_upload(self, jobs):
    """Get the metrics values in a list to export to Google Spreadsheet and BigQuery.

    Args:
      jobs: List of dicts, contains required metrics for each job
    Returns:
      list: A 2-d list consisting of metrics values for each job
    """

    values = []
    for job in jobs:
      row = []
      for param_val in job[consts.PARAMS].values():
        row.append(param_val)

      row.append(job[consts.START_TIME])
      row.append(job[consts.END_TIME])
      for metric_val in job[consts.METRICS].values():
        row.append(metric_val)
      values.append(row)
    return values

  def get_metrics(self, filepath) -> List[Dict[str, Any]]:
    """Returns job metrics obtained from given filepath.

    Args:
      filepath (str): Path of the json file to be parsed

    Returns:
      List of dicts, contains list of jobs and required metrics for each job
    """
    fio_out = self._load_file_dict(filepath)
    job_metrics = self._extract_metrics(fio_out)
    return job_metrics

if __name__ == '__main__':
  argv = sys.argv
  if len(argv) != 2:
    raise TypeError('Incorrect number of arguments.\n'
                    'Usage: '
                    'python3 -m fio.fio_metrics <fio output json filepath>')

  fio_metrics_obj = FioMetrics()
  temp = fio_metrics_obj.get_metrics(argv[1])
  print(temp)
