# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs YCSB against HBase.

HBase is a scalable NoSQL database built on Hadoop.
https://hbase.apache.org/

A running installation consists of:
  * An HDFS NameNode.
  * HDFS DataNodes.
  * An HBase master node.
  * HBase regionservers.
  * A zookeeper cluster (https://zookeeper.apache.org/).

See: http://hbase.apache.org/book.html#_distributed.

This benchmark provisions:
  * A single node functioning as HDFS NameNode, HBase master, and zookeeper
    quorum member.
  * '--num_vms - 1' nodes serving as both HDFS DataNodes and HBase region
    servers (so region servers and data are co-located).
By default only the master node runs Zookeeper. Some regionservers may be added
to the zookeeper quorum with the --hbase_zookeeper_nodes flag.


HBase web UI on 15030.
HDFS web UI on  50070.
"""

import functools
import logging
import os
import posixpath
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import data
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import hadoop
from perfkitbenchmarker.linux_packages import hbase
from perfkitbenchmarker.linux_packages import ycsb

FLAGS = flags.FLAGS

flags.DEFINE_string(
    'hbase_binding',
    'hbase12',
    'HBase binding to use. After '
    'YCSB 0.17.0, "hbase1x" is renamed to "hbase1".',
)
flags.DEFINE_integer('hbase_zookeeper_nodes', 1, 'Number of Zookeeper nodes.')
flags.DEFINE_boolean(
    'hbase_use_snappy', True, 'Whether to use snappy compression.'
)

BENCHMARK_NAME = 'hbase_ycsb'
BENCHMARK_CONFIG = """
hbase_ycsb:
  description: >
      Run YCSB against HBase. Specify the HBase
      cluster size with --num_vms. Specify the number of YCSB VMs
      with --ycsb_client_vms.
  vm_groups:
    master:
      vm_spec: *default_dual_core
      disk_spec: *default_500_gb
    workers:
      vm_spec: *default_dual_core
      disk_spec: *default_500_gb
    clients:
      os_type: ubuntu2204  # Python 2
      vm_spec: *default_dual_core
"""

HBASE_SITE = 'hbase-site.xml'

CREATE_TABLE_SCRIPT = 'hbase/create-ycsb-table.hbaseshell.j2'
TABLE_NAME = 'usertable'
COLUMN_FAMILY = 'cf'
TABLE_SPLIT_COUNT = 200


def GetConfig(user_config):
  config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
  num_vms = max(FLAGS.num_vms, 2)
  if FLAGS['num_vms'].present and FLAGS.num_vms < 2:
    raise ValueError('hbase_ycsb requires at least 2 HBase VMs.')
  if FLAGS['ycsb_client_vms'].present:
    config['vm_groups']['clients']['vm_count'] = FLAGS.ycsb_client_vms
  if FLAGS['num_vms'].present:
    config['vm_groups']['workers']['vm_count'] = num_vms - 1
  return config


def CheckPrerequisites(benchmark_config):
  """Verifies that the required resources are present.

  Raises:
    perfkitbenchmarker.data.ResourceNotFound: On missing resource.
  """
  hbase.CheckPrerequisites()
  hadoop.CheckPrerequisites()
  ycsb.CheckPrerequisites()


def CreateYCSBTable(
    vm,
    table_name=TABLE_NAME,
    family=COLUMN_FAMILY,
    n_splits=TABLE_SPLIT_COUNT,
    limit_filesize=True,
    use_snappy=True,
):
  """Create a table for use with YCSB.

  Args:
    vm: Virtual machine from which to create the table.
    table_name: Name for the table.
    family: Column family name.
    limit_filesize: boolean. Should the filesize be limited to 4GB?
    n_splits: Initial number of regions for the table. Default follows
      HBASE-4163.
  """
  # See: https://issues.apache.org/jira/browse/HBASE-4163
  template_path = data.ResourcePath(CREATE_TABLE_SCRIPT)
  remote = posixpath.join(
      hbase.HBASE_DIR, os.path.basename(os.path.splitext(template_path)[0])
  )
  vm.RenderTemplate(
      template_path,
      remote,
      context={
          'table_name': table_name,
          'family': family,
          'limit_filesize': limit_filesize,
          'n_splits': n_splits,
          'use_snappy': use_snappy,
      },
  )
  # TODO(user): on HBase update, add '-n' flag.
  command = '{}/hbase shell {}'.format(hbase.HBASE_BIN, remote)
  vm.RemoteCommand(command)


def _GetVMsByRole(vm_groups):
  """Partition "vms" by role in the benchmark.

  * The first VM is the master.
  * The first FLAGS.hbase_zookeeper_nodes form the Zookeeper quorum.
  * The last FLAGS.ycsb_client_vms are loader nodes.
  * The nodes which are neither the master nor loaders are HBase region servers.

  Args:
    vm_groups: The benchmark_spec's vm_groups dict.

  Returns:
    A dictionary with keys 'vms', 'hbase_vms', 'master', 'zk_quorum', 'workers',
    and 'clients'.
  """
  hbase_vms = vm_groups['master'] + vm_groups['workers']
  vms = hbase_vms + vm_groups['clients']
  return {
      'vms': vms,
      'hbase_vms': hbase_vms,
      'master': vm_groups['master'][0],
      'zk_quorum': hbase_vms[: FLAGS.hbase_zookeeper_nodes],
      'workers': vm_groups['workers'],
      'clients': vm_groups['clients'],
  }


def Prepare(benchmark_spec):
  """Prepare the virtual machines to run hadoop.

  Args:
    benchmark_spec: The benchmark specification. Contains all data that is
      required to run the benchmark.
  """
  by_role = _GetVMsByRole(benchmark_spec.vm_groups)

  loaders = by_role['clients']
  assert loaders, 'No loader VMs: {}'.format(by_role)

  # HBase cluster
  hbase_vms = by_role['hbase_vms']
  assert hbase_vms, 'No HBase VMs: {}'.format(by_role)
  master = by_role['master']
  zk_quorum = by_role['zk_quorum']
  assert zk_quorum, 'No zookeeper quorum: {}'.format(by_role)
  workers = by_role['workers']
  assert workers, 'No workers: {}'.format(by_role)

  hbase_install_fns = [
      functools.partial(vm.Install, 'hbase') for vm in hbase_vms
  ]
  ycsb_install_fns = [functools.partial(vm.Install, 'ycsb') for vm in loaders]

  background_tasks.RunThreaded(
      lambda f: f(), hbase_install_fns + ycsb_install_fns
  )

  hadoop.ConfigureAndStart(master, workers, start_yarn=False)
  hbase.ConfigureAndStart(master, workers, zk_quorum)

  CreateYCSBTable(master, use_snappy=FLAGS.hbase_use_snappy)

  # Populate hbase-site.xml on the loaders.
  master.PullFile(
      vm_util.GetTempDir(), posixpath.join(hbase.HBASE_CONF_DIR, HBASE_SITE)
  )

  def PushHBaseSite(vm):
    conf_dir = posixpath.join(
        ycsb.YCSB_DIR, FLAGS.hbase_binding + '-binding', 'conf'
    )
    vm.RemoteCommand('mkdir -p {}'.format(conf_dir))
    vm.PushFile(
        os.path.join(vm_util.GetTempDir(), HBASE_SITE),
        posixpath.join(conf_dir, HBASE_SITE),
    )

  background_tasks.RunThreaded(PushHBaseSite, loaders)
  benchmark_spec.executor = ycsb.YCSBExecutor(FLAGS.hbase_binding)


def Run(benchmark_spec):
  """Spawn YCSB and gather the results.

  Args:
    benchmark_spec: The benchmark specification. Contains all data that is
      required to run the benchmark.

  Returns:
    A list of sample.Sample objects.
  """
  by_role = _GetVMsByRole(benchmark_spec.vm_groups)
  loaders = by_role['clients']
  logging.info('Loaders: %s', loaders)

  metadata = {
      'ycsb_client_vms': len(loaders),
      'hbase_cluster_size': len(by_role['hbase_vms']),
      'hbase_zookeeper_nodes': FLAGS.hbase_zookeeper_nodes,
      'hbase_version': hbase.GetHBaseVersion(by_role['hbase_vms'][0]),
  }

  # By default YCSB uses a BufferedMutator for Puts / Deletes.
  # This leads to incorrect update latencies, since since the call returns
  # before the request is acked by the server.
  # Disable this behavior during the benchmark run.
  run_kwargs = {'columnfamily': COLUMN_FAMILY, 'clientbuffering': 'false'}
  load_kwargs = run_kwargs.copy()

  # During the load stage, use a buffered mutator with a single thread.
  # The BufferedMutator will handle multiplexing RPCs.
  load_kwargs['clientbuffering'] = 'true'
  if not FLAGS['ycsb_preload_threads'].present:
    load_kwargs['threads'] = 1
  samples = list(
      benchmark_spec.executor.LoadAndRun(
          loaders, load_kwargs=load_kwargs, run_kwargs=run_kwargs
      )
  )
  for sample in samples:
    sample.metadata.update(metadata)

  return samples


def Cleanup(benchmark_spec):
  """Cleanup.

  Args:
    benchmark_spec: The benchmark specification. Contains all data that is
      required to run the benchmark.
  """
  by_role = _GetVMsByRole(benchmark_spec.vm_groups)
  hbase.Stop(by_role['master'])
  hadoop.StopHDFS(by_role['master'])
  background_tasks.RunThreaded(hadoop.CleanDatanode, by_role['workers'])
