pytest/deploy/palo_env.py (467 lines of code) (raw):
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module describe environment info about Palo.
Date: 2015/10/07 17:23:06
"""
import os
import pexpect
import threading
import time
import env_config
from lib import palo_client
class PaloEnv(object):
"""
Palo部署及运行环境信息
"""
def __init__(self):
self.__master = env_config.master
self.__follower_list = env_config.follower_list
self.__observer_list = env_config.observer_list
self.__be_list = env_config.be_list
self.__fe_path = env_config.fe_path
self.__be_path = env_config.be_path
self.__host_username = env_config.host_username
self.__host_password = env_config.host_password
self.__fe_query_port = env_config.fe_query_port
self.__be_heartbeat_port = self.__fe_query_port + 20
self.__be_data_path_list = env_config.be_data_path_list
self.__master_lock = threading.Lock()
self.__follower_lock = threading.Lock()
self.__observer_lock = threading.Lock()
self.__be_lock = threading.Lock()
self.__dynamic_add_fe_list = env_config.dynamic_add_fe_list
self.__dynamic_add_be_list = env_config.dynamic_add_be_list
self.__dynamic_add_fe_lock = threading.Lock()
self.__dynamic_add_be_lock = threading.Lock()
def init(self):
"""init
"""
if not self.reset_master():
return False
return True
def get_master(self):
"""返回mster
"""
with self.__master_lock:
return self.__master
def set_master(self, host_name):
"""设置mster
"""
assert host_name
with self.__master_lock:
self.__master = host_name
def remove_master(self):
"""remove mster
"""
with self.__master_lock:
self.__master = None
def __get_client(self, host_name, port):
"""
"""
assert host_name
client = palo_client.PaloClient(host_name, port)
if client.init():
time.sleep(10)
return client
else:
return None
def reset_master(self):
"""reset master
"""
# 如果没有get到master, 重试,等待重新选了一个新的master
ix = 0
while ix < 120:
if self.reset_master_once():
return True
time.sleep(1)
ix += 1
else:
return False
def reset_master_once(self):
"""reset master once
"""
cur_master = self.get_master()
electable_tuple = (self.get_master(),) + self.get_follower_tuple() if \
self.get_master() else self.get_follower_tuple()
for host_name in electable_tuple:
client = self.__get_client(host_name, self.get_fe_query_port())
if not client:
return False
if client.is_master():
if host_name != cur_master:
if cur_master:
self.add_follower(cur_master)
self.remove_follower(host_name)
self.set_master(host_name)
return True
return False
def get_follower_tuple(self):
"""返回follower
"""
with self.__follower_lock:
return tuple(self.__follower_list)
def add_follower(self, host_name):
"""增加follower
"""
assert host_name
if host_name not in self.__follower_list:
with self.__follower_lock:
self.__follower_list.append(host_name)
def remove_follower(self, host_name):
"""删除follower
"""
assert host_name
if host_name in self.__follower_list:
with self.__follower_lock:
self.__follower_list.remove(host_name)
def get_observer_tuple(self):
"""返回observer
"""
with self.__follower_lock:
return tuple(self.__observer_list)
def add_observer(self, host_name):
"""增加observer
"""
assert host_name
if host_name not in self.__observer_list:
with self.__observer_lock:
self.__observer_list.append(host_name)
def remove_observer(self, host_name):
"""删除observer
"""
assert host_name
if host_name in self.__observer_list:
with self.__observer_lock:
self.__observer_list.remove(host_name)
def get_fe_tuple(self):
"""返回FE
"""
return (self.get_master(),) + self.get_follower_tuple() + self.get_observer_tuple()
def get_dynamic_add_fe_tuple(self):
"""返回dynamic added fe
"""
with self.__dynamic_add_fe_lock:
return tuple(self.__dynamic_add_fe_list)
def add_dynamic_add_fe(self, host_name):
"""增加dynamic added fe
"""
assert host_name
if host_name not in self.__dynamic_add_fe_list:
with self.__dynamic_add_fe_lock:
self.__dynamic_add_fe_list.append(host_name)
def remove_dynamic_add_fe(self, host_name):
"""删除dynamic added fe
"""
assert host_name
if host_name in self.__dynamic_add_fe_list:
with self.__dynamic_add_fe_lock:
self.__dynamic_add_fe_list.remove(host_name)
def get_be_tuple(self):
"""返回BE
"""
with self.__be_lock:
return tuple(self.__be_list)
def add_be(self, host_name):
"""增加BE
"""
assert host_name
if host_name not in self.__be_list:
with self.__be_lock:
self.__be_list.append(host_name)
def remove_be(self, host_name):
"""删除BE
"""
assert host_name
if host_name in self.__be_list:
with self.__be_lock:
self.__be_list.remove(host_name)
def get_dynamic_add_be_tuple(self):
"""返回dynamic added be
"""
with self.__dynamic_add_be_lock:
return tuple(self.__dynamic_add_be_list)
def add_dynamic_add_be(self, host_name):
"""增加dynamic added be
"""
assert host_name
if host_name not in self.__dynamic_add_be_list:
with self.__dynamic_add_be_lock:
self.__dynamic_add_be_list.append(host_name)
def remove_dynamic_add_be(self, host_name):
"""删除dynamic added be
"""
assert host_name
if host_name in self.__dynamic_add_be_list:
with self.__dynamic_add_be_lock:
self.__dynamic_add_be_list.remove(host_name)
def get_fe_query_port(self):
"""fe query port
"""
return self.__fe_query_port
def get_be_heartbeat_port(self):
"""be port
"""
return self.__be_heartbeat_port
def __exec_cmd(self, cmd, host_name, timeout=120):
"""exec cmd"""
exe_cmd = 'ssh %s@%s "%s"' % (self.__host_username, host_name, cmd)
output, status = pexpect.run(exe_cmd, timeout=timeout, withexitstatus=True, \
events = {"continue connecting":"yes\n", "password:":"%s\n" % self.__host_password})
if 'PALO_CLIENT_LOG_SQL' in os.environ.keys():
palo_client.LOG.info(palo_client.L("execute CMD", exe_cmd=exe_cmd, \
status=status, output=output))
return status, output
def stop_master(self):
"""Stop master
"""
# 结束当前master进程
cmd = 'cd %s/fe;sh bin/stop_fe.sh' % (self.__fe_path)
ix = 0
while ix < 120:
self.__exec_cmd(cmd, host_name=self.__master)
if not self.is_fe_alive(self.get_master()):
self.remove_master()
return True
time.sleep(1)
ix += 1
else:
return False
def start_electable(self, host_name):
"""Start electable
"""
return self.start_follower(host_name)
def start_follower(self, host_name):
"""Start follower
"""
if host_name in self.get_follower_tuple():
return False
cmd = 'cd %s/fe;sh bin/start_fe.sh' % self.__fe_path
self.__exec_cmd(cmd, host_name)
time.sleep(3)
if self.__get_client(host_name, self.get_fe_query_port()):
self.add_follower(host_name)
time.sleep(10)
return True
else:
return False
def stop_follower(self, host_name):
"""stop follower
"""
if host_name not in self.get_follower_tuple():
return False
cmd = 'cd %s/fe;sh bin/stop_fe.sh' % (self.__fe_path)
ix = 0
while ix < 120:
self.__exec_cmd(cmd, host_name)
if not self.is_fe_alive(host_name):
self.remove_follower(host_name)
return True
time.sleep(1)
ix += 1
else:
return False
def start_observer(self, host_name):
"""Start master
"""
if host_name in self.get_observer_tuple():
return False
cmd = 'cd %s/fe;sh bin/start_fe.sh' % (self.__fe_path)
self.__exec_cmd(cmd, host_name)
time.sleep(3)
if self.__get_client(host_name, self.get_fe_query_port()):
self.add_observer(host_name)
time.sleep(10)
return True
else:
return False
def stop_observer(self, host_name):
"""stop observer
"""
if host_name not in self.get_observer_tuple():
return False
cmd = 'cd %s/fe;sh bin/stop_fe.sh' % (self.__fe_path)
ix = 0
while ix < 120:
self.__exec_cmd(cmd, host_name)
if not self.is_fe_alive(host_name):
self.remove_observer(host_name)
return True
time.sleep(1)
ix += 1
else:
return False
def is_fe_alive(self, host_name):
"""check is fe alive
"""
cmd = "cd %s/fe;if [ -f bin/fe.pid ]; then if kill -0 `cat bin/fe.pid` " \
">/dev/null 2>&1; then echo 'running'; fi; fi" % (self.__fe_path)
if self.__exec_cmd(cmd, host_name)[1].rstrip('\r\n') == 'running':
return True
else:
return False
def is_be_alive(self, host_name):
"""check is be alive
"""
cmd = "cd %s/be;if [ -f bin/be.pid ]; then if kill -0 `cat bin/be.pid` " \
">/dev/null 2>&1; then echo 'running'; fi; fi" % (self.__be_path)
if self.__exec_cmd(cmd, host_name)[1].rstrip('\r\n') == 'running':
return True
else:
return False
def restart_master(self):
"""restart master
"""
# follower_tuple = self.get_follower_tuple()
# for follower in follower_tuple:
# if not self.stop_follower(follower):
# return False
old_master = self.get_master()
if not self.stop_master():
return False
if not self.start_electable(old_master):
return False
# for follower in follow_tuple:
# if not self.start_follower(follower):
# return False
# return True
if not self.reset_master():
return False
return True
def switch_master(self):
"""主备切换
"""
cur_master = self.get_master()
if not self.stop_master():
return False
if not self.reset_master():
return False
if not self.start_electable(cur_master):
return False
return True
def restart_follower(self, host_name=None):
"""restart follower
"""
if not self.stop_follower(host_name):
return False
if not self.start_follower(host_name):
return False
return True
def restart_observer(self, host_name=None):
"""restart observer
"""
if not self.stop_observer(host_name):
return False
if not self.start_observer(host_name):
return False
return True
def stop_be(self, host_name):
"""Stop BE
"""
cmd_a = 'cd %s/be;sh bin/stop_be.sh' % (self.__be_path)
cmd_b = 'pkill -f %s/be/lib/palo_be' % (self.__be_path)
ix = 0
while ix < 120:
self.__exec_cmd(cmd_a, host_name)
self.__exec_cmd(cmd_b, host_name)
if not self.is_be_alive(host_name):
self.remove_be(host_name)
return True
time.sleep(1)
ix += 1
return False
def start_be(self, host_name):
"""Start BE
"""
if host_name in self.get_be_tuple():
return False
cmd= 'cd %s/be;sh bin/start_be.sh' % (self.__be_path)
ix = 0
while ix < 120:
self.__exec_cmd(cmd, host_name)
if self.is_be_alive(host_name):
self.add_be(host_name)
time.sleep(10)
return True
time.sleep(1)
ix += 1
return False
def restart_be(self, host_name):
"""Restart be
"""
if not self.stop_be(host_name):
return False
if not self.start_be(host_name):
return False
return True
def stop_all_fe(self):
"""stop all fe
"""
for host_name in self.get_follower_tuple():
if not self.stop_follower(host_name):
return False
for host_name in self.get_observer_tuple():
if not self.stop_observer(host_name):
return False
if not self.stop_master():
return False
return True
def stop_all_be(self):
"""stop all be
"""
for host_name in self.get_be_tuple():
if not self.stop_be(host_name):
return False
return True
def update_fe_config(self, option_value_dict):
"""Update FE config
"""
master = self.get_master()
follower_tuple = self.get_follower_tuple()
observer_tuple = self.get_observer_tuple()
fe_tuple = self.get_fe_tuple()
if not self.stop_all_fe():
return False
configfile_path = '%s/fe/conf/fe.conf' % (self.__fe_path)
for host_name in fe_tuple:
for option, value in option_value_dict.iteritems():
if not self.modify_config(host_name, configfile_path, option, value):
return False
self.start_electable(master)
follower_threads = []
observer_threads = []
for host_name in follower_tuple:
t = threading.Thread(target=self.start_follower, args=(host_name,))
t.start()
follower_threads.append(t)
for host_name in observer_tuple:
t = threading.Thread(target=self.start_observer, args=(host_name,))
t.start()
observer_threads.append(t)
for t in follower_threads:
t.join()
for t in observer_threads:
t.join()
for host_name in follower_tuple + observer_tuple:
if not self.is_fe_alive(host_name):
return False
if not self.reset_master():
return False
return True
def update_be_config(self, option_value_dict):
"""Update BE config
"""
be_tuple = self.get_be_tuple()
if not self.stop_all_be():
return False
configfile_path = '%s/be/conf/be.conf' % (self.__be_path)
for host_name in be_tuple:
for option, value in option_value_dict.iteritems():
if not self.modify_config(host_name, configfile_path, option, value):
return False
be_threads = []
for host_name in self.be_tuple:
t = threading.Thread(target=self.start_be, args=(host_name,))
t.start()
be_threads.append(t)
for t in be_threads:
t.join()
for host_name in be_tuple:
if not self.is_be_alive(host_name):
return False
return True
def modify_config(self, host_name, filepath, option, value):
"""修改配置
"""
cmd = "grep -q '^{option}' {filepath} && "\
"sed -i 's/^{option}.*/{option} = {value}/g' {filepath} || "\
"echo '{option} = {value}' >> {filepath}".format( \
option=option, value=value, filepath=filepath)
status, output = self.__exec_cmd(cmd, host_name)
if status != 0 or output:
return False
cmd = "grep -q '^{} = {}' {}".format(option, value, filepath)
status, output = self.__exec_cmd(cmd, host_name)
if status != 0:
return False
return True
def remove_config(self, host_name, filepath, option):
"""删除配置
"""
cmd = 'sed -i "s/^%s.*/ /" %s' % (option, filepath)
self.__exec_cmd(cmd, host_name)
cmd = "grep -q '^{} {}'".format(option, filepath)
status, output = self.__exec_cmd(cmd, host_name)
if status == 0:
return False
return True
def clean_fe(self, host_name):
"""clean fe
"""
cmd= 'cd %s/fe;rm -rf log palo-meta/image palo-meta/bdb temp' % (self.__fe_path)
return self.__exec_cmd(cmd, host_name)
def clean_be(self, host_name):
"""clean be
"""
data_path = '/* '.join(self.__be_data_path_list) + '/*'
cmd= 'cd %s/be;rm -rf log unused bin/be.pid %s' % (self.__be_path, data_path)
return self.__exec_cmd(cmd, host_name)
def clean_start(self):
"""clean start
"""
self.stop_all_fe()
self.stop_all_be()
master = env_config.master
follower_tuple = tuple(env_config.follower_list)
observer_tuple = tuple(env_config.observer_list)
be_tuple = tuple(env_config.be_list)
for host_name in (master,) + follower_tuple + observer_tuple:
self.clean_fe(host_name)
for host_name in be_tuple:
self.clean_be(host_name)
self.start_electable(master,)
follower_threads = []
observer_threads = []
be_threads = []
for host_name in follower_tuple:
t = threading.Thread(target=self.start_follower, args=(host_name,))
t.start()
follower_threads.append(t)
for host_name in observer_tuple:
t = threading.Thread(target=self.start_observer, args=(host_name,))
t.start()
observer_threads.append(t)
for t in follower_threads:
t.join()
for t in observer_threads:
t.join()
for host_name in be_tuple:
t = threading.Thread(target=self.start_be, args=(host_name,))
t.start()
be_threads.append(t)
for t in be_threads:
t.join()
for host_name in follower_tuple:
if not self.is_fe_alive(host_name):
return False
for host_name in observer_tuple:
if not self.is_fe_alive(host_name):
return False
for host_name in be_tuple:
if not self.is_be_alive(host_name):
return False
if not self.reset_master():
return False
client = self.__get_client(self.get_master(), self.get_fe_query_port())
if not client:
return False
be_heartbeat_port = self.get_be_heartbeat_port()
backend_list = ['%s:%s' % (be, be_heartbeat_port) for be in self.get_be_tuple()]
return client.add_backend_list(backend_list)
def check_be(self, host_name, tablet_id):
"""check whether base expansion done
"""
assert host_name
data_path = '/data '.join(self.__be_data_path_list) + '/data'
cmd = 'find %s -name %s_0_*.dat' % (data_path, tablet_id)
status, output = self.__exec_cmd(cmd, host_name)
assert not status
file_name = output.split('/')[-1]
versions = file_name.split('_')
if versions[2] > 1:
return True
else:
return False
def check_ce(self, host_name, tablet_id):
"""check whether cumulative done
"""
assert host_name
data_path = '/data '.join(self.__be_data_path_list) + '/data'
cmd = 'find %s -name %s_*.dat' % (data_path, tablet_id)
status, output = self.__exec_cmd(cmd, host_name)
assert not status
file_path_list = output.split('\r\n')
for file_path in file_path_list:
file_name = file_path.split('/')[-1]
versions = file_name.split('_')
if versions[1] > 0 and versions[1] != versions[2]:
return True
return False