# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Module containing aerospike server installation and cleanup functions."""

import enum
import hashlib
import logging

from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import linux_packages
from perfkitbenchmarker import os_types
from perfkitbenchmarker import vm_util

FLAGS = flags.FLAGS

GIT_REPO = 'https://github.com/aerospike/aerospike-server.git'
GIT_TAG = '6.0.0.2'
AEROSPIKE_DIR = '%s/aerospike-server' % linux_packages.INSTALL_DIR

AEROSPIKE_VERSION_NAME_FOR_OS = {
    os_types.UBUNTU2004: 'ubuntu20',
    os_types.RHEL8: 'el8',
}


@enum.unique
class AerospikeEdition(enum.Enum):
  """Aerospike Edition constant."""

  COMNUNITY = 'comnunity'
  ENTERPRISE = 'enterprise'


MEMORY = 'memory'
DISK = 'disk'

DEFAULT_VERSION = '6.2.0'
DEFAULT_INSTALL_URL = (
    'https://enterprise.aerospike.com/enterprise/download/server/{}/artifact/{}_{}'
)


# Link could be found here
# https://aerospike.com/download/#servers

_AEROSPIKE_ENTERPRISE_VERSION = flags.DEFINE_string(
    'aerospike_enterprise_version', DEFAULT_VERSION, 'Aerospike version to use'
)

_AEROSPIKE_EDITION = flags.DEFINE_enum_class(
    'aerospike_edition',
    AerospikeEdition.COMNUNITY,
    AerospikeEdition,
    'The type of edition aerospike uses.',
)
flags.DEFINE_enum(
    'aerospike_storage_type',
    MEMORY,
    [MEMORY, DISK],
    (
        'The type of storage to use for Aerospike data. The type of '
        'disk is controlled by the "data_disk_type" flag.'
    ),
)

_AEROSPIKE_ZEROIZE_DISK = flags.DEFINE_bool(
    'aerospike_zeroize_disk',
    True,
    (
        'Whether to zeroize the disk. If set to false, it will run blkddiscard'
        ' to zeroize the header. If set to true, it will wipe the full disk.'
    ),
)
flags.DEFINE_integer(
    'aerospike_replication_factor',
    1,
    'Replication factor for aerospike server.',
)
flags.DEFINE_integer(
    'aerospike_service_threads', 4, 'Number of threads per transaction queue.'
)
flags.DEFINE_integer(
    'aerospike_vms', 1, 'Number of vms (nodes) for aerospike server.'
)
flags.DEFINE_integer(
    'aerospike_proto_fd_max',
    80000,
    'Maximum number of allowed file descriptors to be opened in the VM.'
)

MIN_FREE_KBYTES = 1160000


_AEROSPIKE_CONFIGS = {
    AerospikeEdition.COMNUNITY: 'aerospike_community.conf.j2',
    AerospikeEdition.ENTERPRISE: 'aerospike_enterprise.conf.j2',
}


def _GetAerospikeDir(idx=None):
  if _AEROSPIKE_EDITION.value == AerospikeEdition.COMNUNITY:
    if idx is None:
      return f'{linux_packages.INSTALL_DIR}/aerospike-server'
    else:
      return f'{linux_packages.INSTALL_DIR}/{idx}/aerospike-server'
  else:
    return '/etc/aerospike'


def _GetAerospikeConfig(idx=None):
  if _AEROSPIKE_EDITION.value == AerospikeEdition.COMNUNITY:
    return f'{_GetAerospikeDir(idx)}/as/etc/aerospike_dev.conf'
  else:
    return '~/aerospike/aerospike.conf'


def _InstallFromGit(vm):
  vm.RemoteCommand('git clone {} {}'.format(GIT_REPO, _GetAerospikeDir()))
  # Comment out Werror flag and compile. With newer compilers gcc7xx,
  # compilation is broken due to warnings.
  vm.RemoteCommand(
      'cd {0} && git checkout {1} && git submodule update --init '
      '&& sed -i "s/COMMON_CFLAGS += -Werror/# $COMMON_CFLAGS += -Werror/" '
      '{0}/make_in/Makefile.in '
      '&& make'.format(_GetAerospikeDir(), GIT_TAG)
  )
  for idx in range(FLAGS.aerospike_instances):
    vm.RemoteCommand(
        f'mkdir {linux_packages.INSTALL_DIR}/{idx}; '
        f'cp -rf {_GetAerospikeDir()} {_GetAerospikeDir(idx)}'
    )


def _InstallFromPackage(vm):
  """Installs the aerospike_server package on the VM."""
  if FLAGS.aerospike_instances != 1:
    raise NotImplementedError(
        'Only support one instance of aerospike on enterprise'
    )
  if FLAGS.os_type not in AEROSPIKE_VERSION_NAME_FOR_OS:
    raise ValueError(
        f'Unsupported OS type: {FLAGS.os_type}. Supported OS types are: '
        + ', '.join(AEROSPIKE_VERSION_NAME_FOR_OS.keys())
    )
  # https://docs.aerospike.com/server/operations/install/linux/ubuntu
  vm.RemoteCommand(
      'wget -O aerospike.tgz '
      + DEFAULT_INSTALL_URL.format(
          _AEROSPIKE_ENTERPRISE_VERSION.value,
          AEROSPIKE_VERSION_NAME_FOR_OS[FLAGS.os_type],
          'arm64' if vm.is_aarch64 else 'amd64',
      )
  )
  # Create log directory
  vm.Install('python')
  vm.InstallPackages('dpkg')
  vm.InstallPackages('netcat')
  vm.RemoteCommand('sudo mkdir -p /var/log/aerospike')

  vm.RemoteCommand('mkdir -p aerospike')
  vm.RemoteCommand('tar -xvf aerospike.tgz -C aerospike --strip-components=1')
  vm.RemoteCommand('cd ./aerospike && sudo ./asinstall')
  # lincense key file needs to be in prepvosional data directory
  vm.DownloadPreprovisionedData('./aerospike', 'aerospike', 'features.conf')
  vm.RemoteCommand(
      'sudo mv ./aerospike/features.conf /etc/aerospike/features.conf'
  )


def _Install(vm):
  """Installs the Aerospike server on the VM."""
  vm.Install('build_tools')
  vm.Install('lua5_1')
  vm.Install('openssl')
  vm.Install('wget')
  if _AEROSPIKE_EDITION.value == AerospikeEdition.COMNUNITY:
    _InstallFromGit(vm)
  else:
    _InstallFromPackage(vm)


def YumInstall(vm):
  """Installs the aerospike_server package on the VM."""
  _Install(vm)


def AptInstall(vm):
  """Installs the aerospike_server package on the VM."""
  vm.InstallPackages('netcat-openbsd zlib1g-dev')
  _Install(vm)


@vm_util.Retry(
    poll_interval=5,
    timeout=300,
    retryable_exceptions=(errors.Resource.RetryableCreationError),
)
def _WaitForServerUp(server, idx=None):
  """Block until the Aerospike server is up and responsive.

  Will timeout after 5 minutes, and raise an exception. Before the timeout
  expires any exceptions are caught and the status check is retried.

  We check the status of the server by connecting to Aerospike's out
  of band telnet management port and issue a 'status' command. This should
  return 'ok' if the server is ready. Per the aerospike docs, this always
  returns 'ok', i.e. if the server is not up the connection will fail or we
  would get no response at all.

  Args:
    server: VirtualMachine Aerospike has been installed on.
    idx: aerospike process index.

  Raises:
    errors.Resource.RetryableCreationError when response is not 'ok' or if there
      is an error connecting to the telnet port or otherwise running the remote
      check command.
  """
  try:
    if IsServerUp(server, idx):
      logging.info('Aerospike server status is OK. Server up and running.')
      return
  except errors.VirtualMachine.RemoteCommandError as e:
    raise errors.Resource.RetryableCreationError(
        'Aerospike server not up yet: %s.' % str(e)
    )
  else:
    raise errors.Resource.RetryableCreationError(
        'Aerospike server not up yet (see log for details).'
    )
  finally:
    server.RemoteCommand('sudo systemctl status aerospike')


def IsServerUp(server, idx=None):
  """Check if the Aerospike server is up and responsive.

  Args:
    server: VirtualMachine Aerospike has been installed on.
    idx: aerospike process index.

  Returns:
    A bool indicating whether the server is up.
  """
  idx = idx or 0
  address = server.internal_ip
  port = f'{idx + 3}000'
  logging.info('Trying to connect to Aerospike at %s:%s', server, port)

  out, _ = server.RemoteCommand(
      f'sudo asinfo -v "status" -p {port} -h {address}', ignore_failure=True
  )
  return out.startswith('ok')


def WipeDisk(server, devices):
  """Wipe the disk on the server."""
  for scratch_disk in server.scratch_disks:
    if scratch_disk.mount_point:
      server.RemoteCommand(
          f'sudo umount {scratch_disk.mount_point}', ignore_failure=True
      )

  # https://docs.aerospike.com/server/operations/plan/ssd/ssd_init
  # Expect to exit 1 with `No space left on device` error.
  def _WipeDevice(device):
    if _AEROSPIKE_ZEROIZE_DISK.value:
      server.RobustRemoteCommand(
          f'sudo dd if=/dev/zero of={device} bs=1M', ignore_failure=True
      )
    else:
      server.RobustRemoteCommand(f'sudo blkdiscard {device}')
      server.RobustRemoteCommand(f'sudo blkdiscard -z --length 8MiB {device}')

    background_tasks.RunThreaded(_WipeDevice, devices)

  @vm_util.Retry(
      poll_interval=5,
      timeout=300,
      retryable_exceptions=(errors.Resource.RetryableCreationError),
  )
  def _ZeroizeHeader(device):
    try:
      server.RemoteCommand(f'sudo sudo blkdiscard -z --length 8MiB {device}')
    except errors.Resource.RetryableCreationError as e:
      raise errors.VirtualMachine.RemoteCommandError(
          f'Device {device} header not zeroized: {e}.'
      )

  for device in devices:
    _ZeroizeHeader(device)


def BuildAndStartCommunityAerospike(server, idx):
  """Build and start commmunity version of aerospike."""
  server.RemoteCommand(f'cd {_GetAerospikeDir(idx)} && make init')
  # Persist the nohup command past the ssh session
  # "sh -c 'cd /wherever; nohup ./whatever > /dev/null 2>&1 &'"
  log_file = f'~/aerospike-{server.name}-{idx}.log'
  cmd = (
      f"sh -c 'cd {_GetAerospikeDir(idx)} && nohup sudo make start > "
      f"{log_file} 2>&1 &'"
  )
  server.RemoteCommand(cmd)
  server.PullFile(vm_util.GetTempDir(), log_file)
  _WaitForServerUp(server, idx)
  logging.info('Aerospike server configured and started.')
  server.RemoteCommand(
      'sudo asadm -e "show best-practices"'
  )
  # In certain cases where file descriptor is not enough, the server will
  # hanging when processing disk IOs. Logging to expose more debugging info.
  server.RemoteCommand(
      'sudo cat /var/log/aerospike/aerospike.log | grep "descriptor"'
  )


def RestartEnterpriseAerospike(server):
  server.RemoteCommand('sudo mv ~/aerospike/aerospike.conf /etc/aerospike/')
  server.RemoteCommand('sudo systemctl restart aerospike')
  _WaitForServerUp(server, 0)


def ConfigureAndStart(server, seed_node_ips=None):
  """Prepare the Aerospike server on a VM.

  Args:
    server: VirtualMachine to install and start Aerospike on.
    seed_node_ips: internal IP addresses of seed nodes in the cluster. Leave
      unspecified for a single-node deployment.
  """
  server.Install('aerospike_server')
  seed_node_ips = seed_node_ips or [server.internal_ip]

  if FLAGS.aerospike_storage_type == DISK:
    devices = [
        scratch_disk.GetDevicePath() for scratch_disk in server.scratch_disks
    ]
    WipeDisk(server, devices)
  else:
    devices = []

  # Linux best practice based on:
  # https://docs.aerospike.com/server/operations/install/linux/bestpractices#linux-best-practices
  server.RemoteCommand(
      f'echo {MIN_FREE_KBYTES * FLAGS.aerospike_instances} '
      '| sudo tee /proc/sys/vm/min_free_kbytes'
  )
  server.RemoteCommand('echo 0 | sudo tee /proc/sys/vm/swappiness')
  server.RemoteCommand('ulimit -n %d' % FLAGS.aerospike_proto_fd_max)
  for idx in range(FLAGS.aerospike_instances):
    current_devices = []
    if devices:
      num_device_per_instance = int(len(devices) / FLAGS.aerospike_instances)
      current_devices = devices[
          idx * num_device_per_instance : (idx + 1) * num_device_per_instance
      ]
    server.RenderTemplate(
        data.ResourcePath(_AEROSPIKE_CONFIGS[_AEROSPIKE_EDITION.value]),
        _GetAerospikeConfig(idx),
        {
            'devices': current_devices,
            'port_prefix': 3 + idx,
            'memory_size': int(
                server.total_memory_kb * 0.8 / FLAGS.aerospike_instances
            ),
            'seed_addresses': seed_node_ips,
            'service_threads': FLAGS.aerospike_service_threads,
            'replication_factor': FLAGS.aerospike_replication_factor,
            'proto_fd_max': FLAGS.aerospike_proto_fd_max,
            'node_id': GetNodeId(server),
            'rack_id': GetNodeId(server),
            'enable_strong_consistency': (
                FLAGS.aerospike_enable_strong_consistency
            ),
        },
    )
    if _AEROSPIKE_EDITION.value == AerospikeEdition.COMNUNITY:
      BuildAndStartCommunityAerospike(server, idx)
    else:
      RestartEnterpriseAerospike(server)


def Uninstall(vm):
  del vm


def GetNodeId(vm):
  return int(hashlib.sha1(vm.name.encode('utf-8')).hexdigest(), 16) % (10 ** 5)


def EnableStrongConsistency(vm, namespaces):
  """Enable the strong consistency feature for the given namespaces.

  Args:
    vm: the vm instance where the Aerospike server is running.
    namespaces: the Aerospike namespace to enable strong consistency.
  """
  # Grant necessary permissions
  vm.RemoteCommand(
      'sudo asadm -e "manage acl grant user admin roles sys-admin data-admin'
      ' read-write" --enable'
  )
  vm.RemoteCommand('sudo systemctl status aerospike')
  # Enable Strong Consistency
  for namespace in namespaces:
    out, _ = vm.RemoteCommand(
        f'sudo asinfo -v "roster:namespace={namespace}" | grep -oE'
        r' "observed_nodes=([@a-zA-Z0-9,]+)[\:;]?"'
    )
    nodes = out.split('=')[1]
    vm.RemoteCommand(
        f'sudo asinfo -v "roster-set:namespace={namespace};nodes={nodes}"'
    )
  vm.RemoteCommand('sudo asinfo -v "recluster:"')
