pytest/lib/palo_client.py (2,783 lines of code) (raw):
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
###########################################################################
#
# @file palo_client.py
# @date 2015/02/04 15:26:21
# @brief Palo client
#
############################################################################
"""
Palo client for Palo2 testing.
"""
# 路径设置
import sys
import os
# 系统库
import time
import logging
import socket
import pycurl
from io import BytesIO
import pytest
import json
import pymysql
# local库
import util
import palo_logger
import palo_job
from palo_config import BrokerInfo
from palo_exception import PaloException
from palo_exception import PaloClientException
from palo_sql import *
from palo_verify import verify, verify_by_sql
# 日志 异常 对象
LOG = palo_logger.Logger.getLogger()
L = palo_logger.StructedLogMessage
def get_client(host_name, port, database_name=None, user='root', password='',
charset='utf8', retry=True, http_port=None):
"""get client
"""
client = PaloClient(host_name, port, database_name, user, password, charset, http_port)
if client.init(retry):
return client
else:
raise PaloClientException('get client error: host:%s port:%s database:%s '
'user:%s password:%s' % (host_name, port, database_name, user, password))
class PaloClient(object):
"""Palo client.
成员变量有: host port user password database_name charset connection
"""
def __init__(self, host, port, database_name=None, user='root', password='', charset='utf8',
http_port=None):
"""
Connect to Palo FE.
"""
# TODO 成员变量私有化
self.host = host
self.port = port
self.user = user
self.password = password
self.database_name = database_name
self.charset = charset
self.connection = None
self.cluster_name = 'default_cluster'
if http_port is None:
self.http_port = port - 1000
else:
self.http_port = http_port
self.palo_job_map = {}
def init(self, retry=True):
"""init
"""
assert self.host
retry_times = 0
while retry_times < 10:
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
passwd=self.password,
charset=self.charset)
break
except Exception as e:
LOG.error(L('Connect to Palo error', fe=str(self), error=e))
if not retry:
return False
retry_times += 1
time.sleep(1)
else:
LOG.error(L('Connect to Palo error', retry_times=retry_times))
return False
LOG.info(L("Connected to Palo", host=self.host, port=self.port, user=self.user))
database_name = self.database_name
if database_name is None:
return True
database_name_list = self.get_database_list()
if database_name not in database_name_list:
if not self.create_database(database_name):
return False
self.use(database_name)
return True
def __str__(self):
return "[host: %s, port: %d, database: %s]" % (self.host, self.port, self.database_name)
def __del__(self):
if self.connection:
self.connection.close()
def __execute_and_rebuild_meta_class(self, sql, palo_job_cls):
if palo_job_cls.__name__ in self.palo_job_map.keys():
result = self.execute(sql)
else:
cursor, result = self.execute(sql, return_cursor=True)
description = cursor.description
idx = 0
for col_info in description:
setattr(palo_job_cls, col_info[0], idx)
idx += 1
self.palo_job_map[palo_job_cls.__name__] = True
return result
def use(self, database_name):
"""
specify default database.
"""
try:
self.database_name = database_name
self.connection.select_db(database_name)
except Exception as error:
LOG.error(L("USE database error", fe=str(self), database_name=database_name, \
error=error))
return False
return True
def connect(self):
"""
建立连接
"""
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
passwd=self.password,
charset=self.charset)
except:
LOG.info(L("Re-connected to Palo FE fail.",
host=self.host, port=self.port, user=self.user))
return False
LOG.info(L("Re-connected to Palo FE success.",
host=self.host, port=self.port, user=self.user))
return True
def execute(self, sql, return_rows=False, return_cursor=False):
"""
执行sql语句
"""
if 'PALO_CLIENT_LOG_SQL' in os.environ.keys():
LOG.info(L("execute SQL", sql=sql))
if 'PALO_CLIENT_STDOUT' in os.environ.keys():
# if sql.upper().startswith('SHOW') or sql.upper().startswith('SELECT'):
if sql.upper().startswith('SHOW PROC'):
pass
else:
print(sql.strip(';') + ';')
# avoid: table state is not normal, do not allow doing ALTER ops.
if sql.upper().startswith('ALTER'):
LOG.info(L("before alter sleep 0.5s"))
time.sleep(0.5)
cursor = self.connection.cursor()
try:
rows = cursor.execute(sql)
except pymysql.err.InternalError:
self.connection.ping(reconnect=True)
rows = cursor.execute(sql)
if not return_rows and not return_cursor:
return cursor.fetchall()
elif return_rows:
return rows, cursor.fetchall()
elif return_cursor:
return cursor, cursor.fetchall()
else:
return cursor.fetchall()
def verify(self, expected_file_list, table_name, database_name=None,
save_file_list=None, encoding='utf8', cluster_name=None, key_desc='aggregate'):
"""
verify
"""
LOG.info(L("check file:", file=expected_file_list))
database_name = self.database_name if database_name is None else database_name
# 获取的表的schema,desc
schema = self.desc_table(table_name, database_name)
data = self.select_all(table_name, database_name)
return verify(expected_file_list, data, schema, table_name, database_name, encoding, save_file_list)
def verify_by_sql(self, expected_file_list, sql, schema, save_file_list=None):
"""verify by sql"""
LOG.info(L("check file:", file=expected_file_list))
# 获取的表的schema,desc
data = self.execute(sql)
return verify_by_sql(expected_file_list, data, schema, 'check_by_sql', self.database_name, None, save_file_list)
def create_database(self, database_name, cluster_name=None):
"""
创建database
"""
sql = "CREATE DATABASE %s" % database_name
self.execute(sql)
if not cluster_name:
cluster_name = self.cluster_name
database_name_list = self.get_database_list(cluster_name)
self.use(database_name)
if database_name in database_name_list:
self.database_name = database_name
LOG.info(L("CREATE DATABASE succ", database_name=database_name, fe=str(self)))
return True
else:
LOG.warning(L("CREATE DATABASE fail", database_name=database_name, fe=str(self)))
return False
def create_table(self, table_name, column_list,
partition_info=None, distribution_info=None, storage_type=None,
storage_medium=None, storage_cooldown_time=None, bloom_filter_column_list=None,
replication_num=None, database_name=None, set_null=False, keys_desc=None,
bitmap_index_list=None, dynamic_partition_info=None, replication_allocation=None,
light_schema_change=None, enable_unique_key_merge_on_write=None):
"""
Create table
Attributes:
column_list: 由4元组(name, type, agg_type, default_value)组成的list, 后两项可省略
需要注意的是key列指定默认值是agg_type设置为None
Example: [("k1", "int"), ("k2", "char", None, ""), ("v", "date", "replace")]
bitmap_index_list: 由3元组(index_name, column_name, index_type)组成的list
Example:[("k1_index", "k1", "BITMAP")]
partition_info: PartitionInfo对象
keys_desc: "AGGREGATE KEYS(k1)"
dynamic_partition_info:创建动态分区用到的参数
replication_allocation:设置副本的资源组
enable_unique_key_merge_on_write:unique表是否开启merge-on-write数据更新模式,默认开启
deprecated:
storage_type:
distribution:
added:
3种数据模型 aggregate,duplicate,unique
"""
database_name = database_name if database_name is not None else self.database_name
# table name
sql = 'CREATE TABLE %s.%s (' % (database_name, table_name)
# columns
key_columns = list()
aggregate_flag = False
for column in column_list:
sql = '%s %s,' % (sql, util.column_to_sql(column, set_null))
if len(column) == 2 or column[2] is None:
key_columns.append(column[0])
if len(column) > 2 and column[2] is not None and column[2].upper() in \
['MAX', 'MIN', 'SUM', 'REPLACE', 'HLL_UNION', 'REPLACE_IF_NOT_NULL']:
aggregate_flag = True
if bitmap_index_list is not None:
for bitmap_index in bitmap_index_list:
sql = '%s %s,' % (sql, util.bitmap_index_to_sql(bitmap_index))
sql = '%s )' % sql.rstrip(',')
if keys_desc is None:
if aggregate_flag:
keys_desc = 'AGGREGATE KEY(%s)' % ','.join(key_columns)
else:
keys_desc = ''
sql = '%s %s' % (sql, keys_desc)
# partition
if partition_info:
sql = '%s %s' % (sql, str(partition_info))
# distribution
if distribution_info is None:
distribution_info = DistributionInfo('HASH(%s)' % column_list[0][0], 5)
elif isinstance(distribution_info, DistributionInfo) and \
distribution_info.distribution_type.upper() == 'RANDOM':
distribution_info.distribution_type = 'HASH(%s)' % column_list[0][0]
elif isinstance(distribution_info, str):
distribution_info.replace(' RANDOM', 'HASH(%s)' % column_list[0][0])
distribution_info.replace(' random', 'HASH(%s)' % column_list[0][0])
else:
pass
sql = '%s %s' % (sql, str(distribution_info))
# properties
sql = '%s PROPERTIES (' % (sql)
if storage_medium is not None:
sql = '%s "storage_medium"="%s",' % (sql, storage_medium)
if storage_cooldown_time is not None:
sql = '%s "storage_cooldown_time"="%s",' % (sql, storage_cooldown_time)
if bloom_filter_column_list is not None:
sql = '%s "bloom_filter_columns"="%s",' % (sql, ",".join(bloom_filter_column_list))
if replication_num is not None:
sql = '%s "replication_num"="%s",' % (sql, replication_num)
if dynamic_partition_info is not None:
sql = '%s %s' % (sql, str(dynamic_partition_info))
if replication_allocation is not None:
sql = '%s "replication_allocation"="%s"' % (sql, replication_allocation)
if light_schema_change is not None:
sql = '%s "light_schema_change"="%s"' % (sql, light_schema_change)
if enable_unique_key_merge_on_write is not None:
sql = '%s "enable_unique_key_merge_on_write"="%s"' % (sql, enable_unique_key_merge_on_write)
if sql.endswith(' PROPERTIES ('):
sql = sql.rstrip(' PROPERTIES (')
else:
sql = '%s )' % (sql.rstrip(','))
try:
ret = self.execute(sql)
except Exception as e:
LOG.warning(L('CREATE TABLE fail.', table_name=table_name, msg=str(e)))
if "Failed to find" in str(e) or \
"replication num should be less than the number of available backends" in str(e):
alive_be = self.get_alive_backend_list()
if len(alive_be) < 3:
LOG.warning(L('SKIP: some backends are dead, create table failed, skip test case '))
raise pytest.skip("some backends are dead, create table failed, skip test case")
else:
raise e
else:
raise e
if ret == ():
LOG.info(L('CREATE TABLE succ.', database_name=database_name,
table_name=table_name))
return True
else:
LOG.info(L('CREATE TABLE fail.', database_name=database_name,
table_name=table_name))
return False
def create_table_like(self, table_name, source_table_name, database_name=None,
source_database_name=None, rollup_list=None, external=False,
if_not_exists=False):
"""help create table like see more info"""
sql = 'CREATE {external}TABLE {if_not_exists}{database_name}{table_name} LIKE ' \
'{source_database_name}{source_table_name}{with_rollup}'
external = 'EXTERNAL' if external else ''
if_not_exists = 'IF NOT EXISTS ' if if_not_exists else ''
database_name = database_name + '.' if database_name else ''
with_rollup = ' ROLLUP (%s)' % ','.join(rollup_list) if rollup_list else ''
source_database_name = source_database_name + '.' if source_database_name else ''
sql = sql.format(external=external, if_not_exists=if_not_exists, database_name=database_name,
table_name=table_name, source_database_name=source_database_name,
source_table_name=source_table_name, with_rollup=with_rollup)
ret = self.execute(sql)
return ret == ()
def create_rollup_table(self, table_name, rollup_table_name, column_name_list,
storage_type=None, database_name=None, base_index_name=None,
is_wait=False, force_alter=False, cluster_name=None, after_column_name_list=''):
"""
Create a rollup table
todo:storage_type
"""
database_name = database_name if database_name is not None else self.database_name
sql = 'ALTER TABLE %s.%s ADD ROLLUP %s (%s) %s' % (database_name, table_name,
rollup_table_name, ','.join(column_name_list),
after_column_name_list)
if base_index_name:
sql = "%s FROM %s" % (sql, base_index_name)
if storage_type or force_alter:
sql = '%s PROPERTIES(' % sql
if storage_type:
sql = '%s "storage_type"="column",' % sql
if force_alter:
sql = '%s "force_alter"="true",' % sql
sql = '%s)' % sql.rstrip(',')
ret = self.execute(sql)
if ret != ():
LOG.info(L("CREATE ROLLUP TABLE fail.", database_name=database_name,
table_name=table_name,
rollup_table_name=rollup_table_name))
return False
ret = True
if is_wait:
ret = self.wait_table_rollup_job(table_name, cluster_name=cluster_name,
database_name=database_name)
return ret
LOG.info(L("CREATE ROLLUP TABLE succ.", database_name=database_name,
table_name=table_name,
rollup_table_name=rollup_table_name))
return ret
def cancel_rollup(self, table_name, database_name=None):
"""
取消rollup
"""
database_name = database_name if database_name is not None else self.database_name
sql = 'CANCEL ALTER TABLE ROLLUP FROM %s.%s' % (database_name, table_name)
ret = self.execute(sql)
if ret == ():
LOG.info(L("CANCEL ALTER ROLLUP succ.", database_name=database_name,
table_name=table_name))
return True
else:
LOG.info(L("CANCEL ALTER ROLLUP fail.", database_name=database_name,
table_name=table_name))
return False
def create_materialized_view(self, table_name, materialized_view_name, view_sql,
database_name=None, is_wait=False):
"""
create_materialized_view
:param table_name:
:param materialized_view_name:
:param view_sql:
:param database_name:
:param is_wait:
:return:
"""
database_name = database_name if database_name is not None else self.database_name
sql = 'CREATE MATERIALIZED VIEW %s AS %s' % (materialized_view_name, view_sql)
# before do it sleep 1s
time.sleep(1)
ret = self.execute(sql)
if ret != ():
LOG.info(L("CREATE MATERIALIZED VIEW fail.", database_name=database_name,
materialized_view_name=materialized_view_name))
return False
ret = True
if is_wait:
ret = self.wait_table_rollup_job(table_name, database_name=database_name)
return ret
LOG.info(L("CREATE MATERIALIZED VIEW succ.", database_name=database_name,
table_name=table_name, materialized_view_name=materialized_view_name, ))
return ret
def drop_materialized_view(self, database_name, table_name, view_name):
"""
drop_materialized_view
目前没有delete功能,rd开发中,采用alter table的方式删除
:param database_name:
:param table_name:
:param view_name:
:return:
"""
sql = 'DROP MATERIALIZED VIEW IF EXISTS %s ON %s.%s' % (view_name, database_name, table_name)
ret = self.execute(sql)
return ret
def get_index_list(self, table_name, database_name=None):
"""
获取table的所有index
"""
if not table_name:
return None
ret = self.desc_table(table_name, database_name, is_all=True)
idx_list = util.get_attr(ret, palo_job.DescInfoAll.IndexName)
while '' in idx_list:
idx_list.remove('')
return idx_list
def get_index(self, table_name, index_name=None, database_name=None):
"""
获取table的指定名字的Index,如无指定,返回默认创建的Index info
"""
if not table_name:
return None
if index_name is None:
index_name = table_name
idx_list = self.get_index_list(table_name, database_name)
if index_name in idx_list:
return index_name
else:
LOG.info(L('can not get index from table', index_name=index_name, table_name=table_name))
return None
def get_index_schema(self, table_name, index_name=None, database_name=None):
"""
Get table schema
"""
if index_name is None:
return self.desc_table(table_name, database_name, is_all=False)
ret = self.desc_table(table_name, database_name, is_all=True)
record = False
return_ret = list()
for item in ret:
desc_column = palo_job.DescInfoAll(item)
if record is False and desc_column.get_index_name() != index_name:
continue
elif desc_column.get_index_name() == index_name:
record = True
return_ret.append((desc_column.get_field(), desc_column.get_type(), desc_column.get_null(),
desc_column.get_key(), desc_column.get_default(), desc_column.get_extra()))
elif record is True:
return_ret.append((desc_column.get_field(), desc_column.get_type(), desc_column.get_null(),
desc_column.get_key(), desc_column.get_default(), desc_column.get_extra()))
else:
LOG.info(L('get index schema error'))
return tuple(return_ret)
def set_time_zone(self, zone, global_var=False):
"""设置系统的时区
zone:要设置的时区,是否要global参数
"""
if global_var:
sql = "set global time_zone = '%s'" % zone
else:
sql = "set time_zone = '%s'" % zone
LOG.info(L('palo sql', sql=sql))
palo_res = self.execute(sql)
def set_sql_mode(self, sql_mode=None):
"""set_sql_mode"""
sql_mode = "PIPES_AS_CONCAT" if sql_mode is None else sql_mode
sql = "set sql_mode = %s" % sql_mode
LOG.info(L("palo sql", sql=sql))
res = self.execute(sql)
return res
def get_sql_mode(self):
"""get sql mode"""
sql = "select @@sql_mode"
LOG.info(L("palo sql", sql=sql))
p_res = self.execute(sql)
return p_res
def wait_load_job(self, load_label, database_name=None, cluster_name=None, timeout=1800):
"""
wait load job
"""
database_name = database_name if database_name is not None else self.database_name
LOG.debug(L("wait load job.", load_label=load_label, database_name=database_name))
retry_times = 0
sql = 'SHOW LOAD FROM %s WHERE LABEL="%s"' % (database_name, load_label)
while retry_times < 10 and timeout > 0:
time.sleep(1)
timeout -= 1
load_job = self.execute(sql)
if len(load_job) == 0:
retry_times += 1
continue
state = palo_job.LoadJob(load_job[-1]).get_state()
if state.upper() != "FINISHED" and state.upper() != "CANCELLED":
continue
elif state.upper() == "FINISHED":
return True
else:
LOG.info(L("LOAD FAILED.", database_name=database_name,
msg=palo_job.LoadJob(load_job[-1]).get_errormsg(),
url=palo_job.LoadJob(load_job[-1]).get_url()))
return False
else:
LOG.info(L("LOAD LABEL NOT EXIST", load_label=load_label, database_name=database_name))
return False
def bulk_load(self, table_name, load_label, data_file, max_filter_ratio=None,
column_name_list=None, timeout=None, database_name=None, host=None, port=None,
user=None, password=None, is_wait=False, cluster_name=None,
hll_column_list=None, column_separator=None, backend_id=None):
"""
Args:
table_name: string, table name
load_label: string, load label
data_file: string, 导入的文件
max_filter_ratio: float, max_filter_ratio
column_name_list: list, column name list
timeout: int, timeout
database_name: string, database name
host: string, fe host
port: int, fe http port
user: string, fe user
password: string, fe password
is_wait: True/False, is wait
cluster_name: string, cluster name
hll_column_list: list, like ['hll_column1,k1', 'hll_column,k2']
backend_id兼容以前的代码
Returns:
True: mini load success
False: mini load Fail
"""
database_name = database_name if database_name is not None else self.database_name
host = self.host if host is None else host
port = self.http_port if port is None else port
user = self.user if user is None else user
password = self.password if password is None else password
url = 'http://%s:%s/api/%s/%s/_load?label=%s' % (host, port, database_name,
table_name, load_label)
if max_filter_ratio:
url = '%s&max_filter_ratio=%s' % (url, max_filter_ratio)
if column_name_list:
url = '%s&columns=%s' % (url, ','.join(column_name_list))
if hll_column_list:
url = '%s&hll=%s' % (url, ':'.join(hll_column_list))
if timeout:
url = '%s&timeout=%s' % (url, timeout)
if column_separator:
url = '%s&column_separator=%s' % (url, column_separator)
cmd = 'curl --location-trusted -u %s:%s -T %s %s' % (user, password, data_file, url)
print(cmd)
LOG.info(L('bulk multi load', cmd=cmd))
file = open(data_file)
c = pycurl.Curl()
buf = BytesIO()
c.setopt(c.URL, url)
c.setopt(pycurl.WRITEFUNCTION, buf.write)
# basic认证
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
c.setopt(pycurl.USERNAME, user)
c.setopt(pycurl.PASSWORD, password)
# 上传文件
c.setopt(pycurl.UPLOAD, 1)
c.setopt(pycurl.READDATA, file)
# 重定向, --location-trusted
c.setopt(pycurl.UNRESTRICTED_AUTH, 1)
c.setopt(pycurl.FOLLOWLOCATION, True)
LOG.info(L("BULK LOAD.", url=url, file_name=data_file))
msg = None
try:
c.perform()
msg = buf.getvalue()
LOG.info(L("BULK LOAD ret.", ret=msg))
print(c.getinfo(c.HTTP_CODE))
# if c.getinfo(c.HTTP_CODE) == 200:
if c.getinfo(c.HTTP_CODE):
ret = json.loads(msg)
status = ret.get('status')
if status == 'Success':
if is_wait:
r = self.wait_load_job(load_label, database_name, cluster_name=cluster_name)
return r
else:
return True
elif status == 'Fail':
return False
else:
return False
except Exception as e:
LOG.info(L("BULK LOAD failed.", err=str(e), msg=msg))
return False
def batch_load(self, load_label, load_data_list, max_filter_ratio=None, timeout=None,
hadoop_info=None, by_load_cluster=None, property_list=None,
database_name=None, is_wait=False, cluster_name=None, broker=None,
strict_mode=None, timezone=None, temp_partition=None):
"""
Load data
Attributes:
load_data_list: LoadDataInfo对象或list
"""
database_name = self.database_name if database_name is None else database_name
data_list = list()
if isinstance(load_data_list, LoadDataInfo):
data_list.append(str(load_data_list))
elif isinstance(load_data_list, list):
for data in load_data_list:
data_list.append(str(data))
else:
raise PaloClientException('Load data list should be list or LoadDataInfo',
load_data_list=load_data_list)
sql = 'LOAD LABEL %s.%s (%s)' % (database_name, load_label,
', '.join(data_list))
if by_load_cluster is not None:
sql = '%s BY "%s"' % (sql, by_load_cluster)
if broker is not None:
sql = '%s %s' % (sql, str(broker))
sql = '%s %s' % (sql, 'PROPERTIES(')
if max_filter_ratio is not None:
sql = '%s "max_filter_ratio"="%s",' % (sql, max_filter_ratio)
if timeout is not None:
sql = '%s "timeout"="%d",' % (sql, timeout)
if strict_mode is not None:
sql = '%s "strict_mode"="%s",' % (sql, strict_mode)
if timezone is not None:
sql = '%s "timezone"="%s",' % (sql, timezone)
if hadoop_info is not None:
sql = '%s %s' % (sql, str(hadoop_info))
if property_list:
sql = sql + ', '.join(property_list) + ','
if sql.endswith(','):
sql = sql.rstrip(',')
sql = '%s %s' % (sql, ')')
else:
sql = sql.rstrip('PROPERTIES(')
ret = self.execute(sql)
if ret != ():
LOG.info(L('LOAD fail.', database_name=database_name, load_label=load_label))
return False
if is_wait:
ret = self.wait_load_job(load_label, database_name, cluster_name=cluster_name)
if not ret:
LOG.info(L('LOAD fail.', database_name=database_name, load_label=load_label))
return False
LOG.info(L('LOAD succ.', database_name=database_name, load_label=load_label))
return True
def cancel_load(self, load_label, database_name=None):
"""
取消导入任务
"""
database_name = self.database_name if database_name is None else database_name
sql = 'CANCEL LOAD FROM %s WHERE LABEL = "%s"' % (database_name, load_label)
ret = self.execute(sql)
if ret == ():
LOG.info(L("CANCEL LOAD succ.", database_name=database_name, label=load_label))
return True
else:
LOG.info(L("CANCEL LOAD fail.", database_name=database_name, label=load_label))
return False
def get_quota(self, database_name, cluster_name=None):
"""
get quota
"""
if cluster_name:
database_name = '%s:%s' % (cluster_name, database_name)
sql = "SHOW PROC '/dbs'"
result = self.execute(sql)
for info in result:
if info[1] == database_name:
return info[3]
LOG.warning(L("Get quota fail.", database_name=database_name))
return None
def alter_database(self, database_name, quota):
"""
alter database
"""
sql = "ALTER DATABASE %s SET DATA QUOTA %d" % (database_name, quota)
result = self.execute(sql)
if result != ():
LOG.error(L("ALTER DATABASE fail.", database_name=database_name, quota=quota))
return False
return True
def drop_database(self, database_name=None, force=None):
"""
删除数据库
Parameters:
database_name:如果为None,将删除默认的database. Type: str.
Returns:
False:数据库删除失败
True:数据库删除成功
Raises:
PaloClientException: 数据库删除异常
"""
database_name = self.database_name if database_name is None else database_name
if self.database_name == database_name:
self.database_name = None
sql = "DROP DATABASE %s" % database_name
if force is True:
sql = "DROP DATABASE %s FORCE" % database_name
self.execute(sql)
database_name_list = self.get_database_list()
if database_name in database_name_list:
LOG.warning(L("DROP DATABASE fail.", database_name=database_name))
return False
else:
LOG.info(L("DROP DATABASE succ.", database_name=database_name))
return True
def clean(self, database_name=None):
"""
清除所有数据
Parameters:
database_name:默认删除所有database。Type: str.
Returns:
None
"""
if database_name is None:
database_name_list = self.get_database_list()
for database_name in database_name_list:
if database_name.find("information_schema") == -1:
self.clean(database_name)
else:
database_name_list = self.get_database_list()
while database_name in database_name_list:
try:
self.drop_database(database_name, force=True)
except PaloClientException:
pass
database_name_list = self.get_database_list()
def drop_table(self, table_name, database_name=None, cluster_name=None, if_exist=False):
"""删除table family
Parameters:
database_name:如果为None,将删除默认的database. [Type str]
table_family_name: table family name. [Type str]
Returns:
目前无返回值
Raises:
没有捕获,可能抛出
"""
database_name = self.database_name if database_name is None else database_name
if not if_exist:
sql = 'DROP TABLE'
else:
sql = 'DROP TABLE IF EXISTS'
sql = "%s %s.%s" % (sql, database_name, table_name)
self.execute(sql)
return True
def drop_rollup_table(self, table_name, rollup_table_name, database_name=None):
"""删除rollup table
Parameters:
database_name:如果为None,将删除默认的database. [Type str]
table_family_name: table family name. [Type str]
Returns:
目前无返回值
Raises:
没有捕获,可能抛出
"""
database_name = self.database_name if database_name is None else database_name
sql = "ALTER TABLE %s.%s DROP ROLLUP %s" % \
(database_name, table_name, rollup_table_name)
ret = self.execute(sql)
return ret == ()
def select_all(self, table_name, database_name=None):
"""
select all
"""
database_name = self.database_name if database_name is None else database_name
sql = "SELECT * FROM %s.%s" % (database_name, table_name)
result = self.execute(sql)
return result
def query(self, sql):
"""
query
"""
result = self.execute(sql)
return result
def get_load_job_state(self, label, database_name=None, cluster_name=None):
"""
get load job state
"""
load_job_list = self.get_load_job_list(database_name=database_name, \
cluster_name=cluster_name)
for load_job in load_job_list:
job = palo_job.LoadJob(load_job)
if job.get_label() == label:
return job.get_state()
return None
def get_unfinish_load_job_list(self, database_name=None, cluster_name=None):
"""
获取所有未完成的导入任务,即:状态为pending、etl、loading的任务
"""
load_job_list = self.get_load_job_list( \
database_name=database_name, cluster_name=cluster_name)
result = list()
for load_job in load_job_list:
job = palo_job.LoadJob(load_job)
if job.get_state() != "FINISHED" and job.get_state() != "CANCELLED":
result.append(load_job)
return result
def get_load_job_list(self, state=None, database_name=None, cluster_name=None):
"""
获取指定状态的导入任务信息, according job state get load job
"""
database_name = self.database_name if database_name is None else database_name
sql = 'SHOW LOAD FROM %s' % database_name
job_list = self.__execute_and_rebuild_meta_class(sql, palo_job.LoadJob)
result = list()
if state:
state = state.upper()
for job in job_list:
if palo_job.LoadJob(job).get_state() == state:
result.append(job)
return result
else:
return job_list
def get_load_job(self, label, database_name=None, cluster_name=None):
"""
get load job, according label get load job
"""
load_job_list = self.get_load_job_list(database_name=database_name, \
cluster_name=cluster_name)
for load_job in load_job_list:
if palo_job.LoadJob(load_job).get_label() == label:
return load_job
return None
def get_delete_job_list(self, state=None, database_name=None, cluster_name=None):
"""
获取指定状态的delete job信息
"""
database_name = self.database_name if database_name is None else database_name
if not cluster_name:
cluster_name = self.cluster_name
if cluster_name:
database_name = '%s:%s' % (cluster_name, database_name)
sql = 'SHOW DELETE FROM %s' % database_name
job_list = self.execute(sql)
result = list()
if state:
state = state.upper()
for job in job_list:
if palo_job.DeleteJob(job).get_state() == state:
result.append(job)
return result
else:
return job_list
def wait_table_rollup_job(self, table_name, database_name=None, cluster_name=None,
state='FINISHED', timeout=2000):
"""
等待rollup完成
"""
time.sleep(5)
database_name = self.database_name if database_name is None else database_name
retry_times = 10
while timeout > 0:
job_list = self.get_table_rollup_job_list(table_name,
database_name, cluster_name=cluster_name)
if not job_list:
retry_times -= 1
if retry_times == 0:
LOG.info(L("CANNOT GET ROLLUP JOB.", database_name=database_name))
break
time.sleep(1)
continue
last_job_state = palo_job.RollupJob(job_list[-1]).get_state()
if last_job_state == state:
LOG.info(L("GET ROLLUP JOB STATE.", database_name=database_name,
state=palo_job.RollupJob(job_list[-1]).get_state()))
return True
if last_job_state == 'CANCELLED':
LOG.info(L("ROLLUP JOB CANCELLED.", database_name=database_name,
msg=palo_job.RollupJob(job_list[-1]).get_msg()))
return False
if state != 'FINISHED' and last_job_state == 'FINISHED':
LOG.info(L("ROLLUP JOB FINISHED.", database_name=database_name))
return False
time.sleep(3)
timeout -= 3
LOG.warning(L("WAIT ROLLUP JOB TIMEOUT.", database_name=database_name))
return False
def wait_table_load_job(self, database_name=None, timeout=1800):
"""等待db的所有load任务完成"""
database_name = self.database_name if database_name is None else database_name
flag = False
while timeout > 0:
flag = True
load_job_list = self.execute('SHOW LOAD FROM %s' % database_name)
for job in load_job_list:
LOG.info(L("LOAD JOB STATE.", database_name=database_name,
state=job[palo_job.LoadJob.State]))
if job[palo_job.LoadJob.State] != 'CANCELLED' and \
job[palo_job.LoadJob.State] != 'FINISHED':
LOG.info(L("LOAD JOB RUNNING.", database_name=database_name,
state=job[palo_job.LoadJob.State]))
flag = False
break
time.sleep(3)
timeout -= 3
if flag:
LOG.info(L("LOAD JOB FINISHED.", database_name=database_name))
break
LOG.info(L("WAIT LOAD JOB FINISHED.", database_name=database_name))
def get_table_rollup_job_list(self, table_name, database_name=None, cluster_name=None):
"""
获取指定table family的rollup任务信息
"""
database_name = self.database_name if database_name is None else database_name
sql = 'SHOW ALTER TABLE ROLLUP FROM %s WHERE TableName = "%s"' % (database_name, table_name)
database_rollup_job_list = self.__execute_and_rebuild_meta_class(sql, palo_job.RollupJob)
table_rollup_job_list = []
for rollup_job in database_rollup_job_list:
if palo_job.RollupJob(rollup_job).get_table_name() == table_name:
table_rollup_job_list.append(rollup_job)
return table_rollup_job_list
def get_database_rollup_job_list(self, database_name=None, cluster_name=None):
"""
获取database的所有rollup job信息
"""
database_name = self.database_name if database_name is None else database_name
if cluster_name:
database = '%s:%s' % (cluster_name, database_name)
sql = "SHOW PROC '/jobs/%s/rollup'" % self.get_database_id(database_name)
return self.execute(sql)
def get_database_list(self, cluster_name=None):
"""
显示所有的database name list
Parameters:
None
Returns:
database name list
Raises:
PaloClientException:获取数据异常
"""
sql = r"SHOW DATABASES"
result = self.execute(sql)
database_name_list = [name[0] for name in result]
return database_name_list
def get_partition_list(self, table_name, database_name=None, cluster_name=None):
"""
get table families
"""
database_name = self.database_name if database_name is None else database_name
if cluster_name:
database_name = '%s:%s' % (cluster_name, database_name)
sql = "SHOW PARTITIONS FROM %s.%s" % (database_name, table_name)
result = self.__execute_and_rebuild_meta_class(sql, palo_job.PartitionInfo)
return result
def get_partition(self, table_name, partition_name, database_name=None, cluster_name=None):
"""
获取指定的table family
"""
partition_list = self.get_partition_list(table_name, database_name, \
cluster_name=cluster_name)
for partition in partition_list:
if partition[palo_job.PartitionInfo.PartitionName] == partition_name:
return partition
return None
def get_partition_id(self, table_name, partition_name, database_name=None):
"""
get table family id.
Parameters:
database_name
table_family_name
Returns:
None:如果table family不存在
"""
partition = self.get_partition(table_name, \
partition_name, database_name)
if partition:
return partition[palo_job.PartitionInfo.PartitionId]
else:
return None
def get_partition_name_by_id(self, table_name, partition_id, \
database_name=None, cluster_name=None):
"""get partition name by id
"""
partition_list = self.get_partition_list(table_name, \
database_name, cluster_name=cluster_name)
for partition in partition_list:
if partition[palo_job.PartitionInfo.PartitionId] == partition_id:
return partition[palo_job.PartitionInfo.PartitionName]
return None
def get_partition_version(self, table_name, \
partition_name, database_name=None, cluster_name=None):
"""
获取table family的version号
"""
partition = self.get_partition(table_name, \
partition_name, database_name, cluster_name=cluster_name)
if partition:
return partition[palo_job.PartitionInfo.VisibleVersion]
else:
return None
def get_partition_storage_medium(self, table_name, partition_name, \
database_name=None, cluster_name=None):
"""
get table family id.
Parameters:
database_name
table_family_name
Returns:
None:如果table family不存在
"""
partition = self.get_partition(table_name, \
partition_name, database_name, cluster_name=cluster_name)
if partition:
return partition[palo_job.PartitionInfo.StorageMedium]
else:
return None
def get_partition_cooldown_time(self, table_name, partition_name, \
database_name=None, cluster_name=None):
"""
get table family id.
Parameters:
database_name
table_family_name
Returns:
None:如果table family不存在
"""
partition = self.get_partition(table_name,
partition_name, database_name, cluster_name=cluster_name)
if partition:
return partition[palo_job.PartitionInfo.CooldownTime]
else:
return None
def get_partition_replication_num(self, table_name, partition_name,
database_name=None, cluster_name=None):
"""
get table family id.
Parameters:
database_name
table_family_name
Returns:
None:如果table family不存在
"""
partition = self.get_partition(table_name,
partition_name, database_name, cluster_name=cluster_name)
if partition:
return partition[palo_job.PartitionInfo.ReplicationNum]
else:
return None
def get_partition_buckets(self, table_name, partition_name,
database_name=None, cluster_name=None):
"""
get table family id.
Parameters:
database_name
table_family_name
Returns:
None:如果table family不存在
"""
partition = self.get_partition(table_name, partition_name,
database_name, cluster_name=cluster_name)
if partition:
return partition[palo_job.PartitionInfo.Buckets]
else:
return None
def delete(self, table_name, delete_conditions,
partition_name=None, database_name=None, is_wait=False):
"""
按条件删除指定 table family 中的数据
Parameters:
delete_conditions:可以是删除条件组成的list,每个删除条件是一个三元组,如下。也可以是字符串
由列名,比较运算符和值三项组成。如:
[("k1", "=", "0"), ("k2", "<=", "10")]
Returns:
True:执行成功
False:执行失败
Raises:
"""
database_name = self.database_name if database_name is None else database_name
if not partition_name:
sql = "DELETE FROM %s.%s WHERE" % (database_name, table_name)
else:
sql = "DELETE FROM %s.%s PARTITION %s WHERE" % (database_name,
table_name, partition_name)
if isinstance(delete_conditions, list):
for where_condition in delete_conditions:
# 对value项加上引号, 防止datetime因空格分隔造成错误
where_condition = list(where_condition)
where_condition[2] = "\"%s\"" % where_condition[2]
sql = "%s %s AND" % (sql, " ".join(where_condition))
sql = sql[:-4]
elif isinstance(delete_conditions, str):
sql = "%s %s" % (sql, delete_conditions)
# 同步的,返回时,已删除,未返回时,其他连接show delete状态为DELETING状态
time.sleep(1)
ret = self.execute(sql)
LOG.info(L("DELETE EXECUTE SUCC", ret=ret))
if is_wait is True:
if database_name is not None:
line = 'SHOW DELETE FROM %s' % database_name
else:
line = 'SHOW DELETE'
delete_job = self.execute(line)
for job in delete_job:
if job[4] == 'FINISHED':
pass
else:
time.sleep(1)
return ret == ()
def show_delete(self, database_name=None):
"""show delete """
if not database_name:
database_name = self.database_name
sql = 'SHOW DELETE FROM %s' % database_name
ret = self.execute(sql)
return ret
def schema_change(self, table_family_name, add_column_list=None,
drop_column_list=None, modify_column_list=None, order_column_list=None,
bloom_filter_column_list=None, database_name=None,
colocate_with_list=None, distribution_type=None,
is_wait=False, cluster_name=None, comment=None, replication_allocation=None):
"""schema change
Parameters:
table_family_name:
rollup_table_name:
add_column_list: 新加列,每个元素为一个新加列
"列名 类型 聚合方法 default '默认值' after '列名' in 'rollup表名'"
e.g. ["addc1 int default '1'", \
"addc2 float sum default '2.3'", \
"addc3 int default '2' after k1", \
"addc1 int default '1' in 'rollup_table'"]
drop_column_list: 删除列,每个元素为一个删除的列名
e.g. ["k1", "v"]
modify_column_list: 修改列属性,每个元素为一个列, 参考add_column_list
order_column_list: 调整列顺序, 每个元素为一个删除的列名
bloom_filter_column_list:
database_name:
colocate_with_list:
distribution_type:
is_wait:
cluster_name:
comment:表注释
replication_allocation:副本标签
"""
database_name = self.database_name if database_name is None else database_name
sql = "ALTER TABLE %s.%s" % (database_name, table_family_name)
if add_column_list:
if len(add_column_list) == 1:
sql = "%s ADD COLUMN %s" % (sql, ", ".join(add_column_list))
else:
sql = "%s ADD COLUMN (%s)" % (sql, ", ".join(add_column_list))
if drop_column_list:
sql = "%s DROP COLUMN %s" % (sql, ", DROP COLUMN ".join(drop_column_list))
if order_column_list:
if len(order_column_list) == 1:
sql = "%s ORDER BY %s" % (sql, ", ".join(order_column_list))
else:
sql = "%s ORDER BY (%s)" % (sql, ", ".join(order_column_list))
if modify_column_list:
sql = "%s MODIFY COLUMN %s" % (sql, ", MODIFY COLUMN ".join(modify_column_list))
if bloom_filter_column_list:
if add_column_list is None and drop_column_list is None \
and modify_column_list is None and order_column_list is None:
sql = '%s SET ("bloom_filter_columns"="%s")' % (sql, ",".join(bloom_filter_column_list))
else:
sql = '%s PROPERTIES ("bloom_filter_columns"="%s")' % (sql, ",".join(bloom_filter_column_list))
if colocate_with_list:
sql = '%s SET ("colocate_with"="%s")' % (sql, ",".join(colocate_with_list))
if distribution_type:
sql = '%s SET ("distribution_type"="%s")' % (sql, ",".join(distribution_type))
if comment:
sql = '%s MODIFY comment "%s"' % (sql, comment)
if replication_allocation:
sql = '%s SET ("replication_allocation"="%s")' % (sql, replication_allocation)
result = self.execute(sql)
if result != ():
LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name,
table_family_name=table_family_name))
return False
if is_wait:
LOG.info(L("wait for SCHEMA CHANGE.", database_name=database_name,
table_family_name=table_family_name))
if not self.wait_table_schema_change_job(table_family_name, cluster_name=cluster_name):
LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name,
table_family_name=table_family_name))
return False
return True
def schema_change_add_column(self, table_name, column_list, after_column_name=None,
to_table_name=None, force_alter=False, database_name=None,
is_wait_job=False, is_wait_delete_old_schema=False,
cluster_name=None, set_null=False):
"""
增加列
column_list: column_name, column_type, aggtype, default_value
same to function create table family
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s ADD COLUMN' % (database_name, table_name)
if len(column_list) > 1:
sql = '%s(' % sql
for column in column_list:
sql = '%s %s, ' % (sql, util.column_to_sql(column, set_null))
sql = sql.rstrip(', ')
if len(column_list) > 1:
sql = '%s)' % sql
if after_column_name:
if after_column_name == 'FIRST':
sql = '%s %s' % (sql, after_column_name)
else:
sql = '%s AFTER %s' % (sql, after_column_name)
if to_table_name:
sql = '%s TO %s' % (sql, to_table_name)
if force_alter:
sql = '%s PROPERTIES("force_alter"="true")' % sql
result = self.execute(sql)
if result != ():
LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name,
table_name=table_name))
return False
if is_wait_job:
if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name):
return False
return True
def schema_change_drop_column(self, table_name, column_name_list,
from_table_name=None, force_alter=False, database_name=None,
is_wait_job=False, is_wait_delete_old_schema=False, cluster_name=None):
"""
删除列
column_name_list: ['k1', 'v1', 'v3']
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s' % (database_name, table_name)
from_table_sql = ''
if from_table_name:
from_table_sql = 'FROM %s' % (from_table_name)
force_alter_sql = ''
if force_alter:
force_alter_sql = 'PROPERTIES("force_alter"="true")'
for column in column_name_list:
sql = '%s DROP COLUMN %s %s %s, ' % (sql, column, from_table_sql, force_alter_sql)
sql = sql.rstrip(', ')
result = self.execute(sql)
if result != ():
LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name,
table_name=table_name))
return False
if is_wait_job:
if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name):
return False
return True
def schema_change_order_column(self, table_name, column_name_list,
from_table_name=None, force_alter=False, database_name=None,
is_wait_job=False, is_wait_delete_old_schema=False, cluster_name=None):
"""
重新排序
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s' % (database_name, table_name)
from_table_sql = ''
if from_table_name:
from_table_sql = 'FROM %s' % (from_table_name)
force_alter_sql = ''
if force_alter:
force_alter_sql = 'PROPERTIES("force_alter"="true")'
sql = '%s ORDER BY (%s)' % (sql, ', '.join(column_name_list))
sql = '%s %s %s' % (sql, from_table_sql, force_alter_sql)
result = self.execute(sql)
if result != ():
LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name,
table_name=table_name))
return False
if is_wait_job:
if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name):
return False
return True
def schema_change_modify_column(self, table_name, column_name, column_type,
after_column_name=None, from_table_name=None, force_alter=False,
database_name=None, is_wait_job=False,
is_wait_delete_old_schema=False, cluster_name=None, aggtype='', column_info=""):
"""
修改列类型
"""
database_name = self.database_name if database_name is None else database_name
schema = self.desc_table(table_name, database_name=database_name)
if not aggtype:
for field in schema:
if field[0] == column_name and field[3] == 'false':
aggtype = field[5].split(',')[0]
break
if aggtype == '-' or aggtype == 'NONE':
aggtype = ''
sql = 'ALTER TABLE %s.%s MODIFY COLUMN %s %s %s %s' % (database_name,
table_name, column_name, column_type, aggtype,
column_info)
if after_column_name:
sql = '%s AFTER %s' % (sql, after_column_name)
if from_table_name:
sql = '%s FROM %s' % (sql, from_table_name)
if force_alter:
sql = '%s PROPERTIES("force_alter"="true")' % sql
result = self.execute(sql)
if result != ():
LOG.info(L("SCHEMA CHANGE fail.", database_name=database_name,
table_name=table_name))
return False
if is_wait_job:
if not self.wait_table_schema_change_job(table_name, cluster_name=cluster_name):
return False
return True
def wait_table_schema_change_job(self, table_name, database_name=None, cluster_name=None,
state="FINISHED", timeout=1200):
"""
等待schema change完成
"""
database_name = self.database_name if database_name is None else database_name
try_times = 0
while try_times < 120 and timeout > 0:
time.sleep(3)
timeout -= 3
schema_change_job_list = self.get_table_schema_change_job_list(
table_name, database_name, cluster_name=cluster_name)
if not schema_change_job_list or len(schema_change_job_list) == 0:
try_times += 1
continue
last_job_state = palo_job.SchemaChangeJob(schema_change_job_list[-1]).get_state()
LOG.info(L("GET LAST SCHEMA CHANGE JOB STATE", state=last_job_state))
if last_job_state == state:
LOG.info(L("GET SCHEMA CHANGE JOB STATE.", database_name=database_name,
state=palo_job.SchemaChangeJob(schema_change_job_list[-1]).get_state()))
return True
if last_job_state == 'CANCELLED' and state != 'CANCELLED':
LOG.info(L("SCHEMA CHANGE fail.", state='CANCELLED',
msg=palo_job.SchemaChangeJob(schema_change_job_list[-1]).get_msg()))
return False
if state != 'FINISHED' and last_job_state == 'FINISHED':
LOG.info(L("SCHEMA CHANGE FINISHED.", state='FINISHED'))
return False
LOG.warning(L("WAIT SCHEMA CHANGE TIMEOUT.", database_name=database_name))
return False
def get_table_schema_change_job_list(self, table_name, database_name=None, cluster_name=None):
"""
获取指定table的所有schema change job信息
"""
database_name = self.database_name if database_name is None else database_name
sql = 'SHOW ALTER TABLE COLUMN FROM %s' % database_name
database_schema_change_job_list = self.__execute_and_rebuild_meta_class(sql, palo_job.SchemaChangeJob)
table_schema_change_job_list = []
for schema_change_job in database_schema_change_job_list:
if palo_job.SchemaChangeJob(schema_change_job).get_table_name() == table_name:
table_schema_change_job_list.append(schema_change_job)
return table_schema_change_job_list
def cancel_schema_change(self, table_name, database_name=None):
"""
取消rollup
"""
database_name = self.database_name if database_name is None else database_name
sql = 'CANCEL ALTER TABLE COLUMN FROM %s.%s' % (database_name, table_name)
ret = self.execute(sql)
if ret == ():
LOG.info(L("CANCEL SCHEMA CHANGE Succ.", database_name=database_name,
table_name=table_name))
return True
else:
LOG.info(L("CANCEL SCHEMA CHANGE Fail.", database_name=database_name,
table_name=table_name))
return False
def add_partition(self, table_name, partition_name, value,
distribute_type=None, bucket_num=None,
storage_medium=None, storage_cooldown_time=None,
database_name=None, partition_type=None):
"""
增加分区
增加分区的时候,只支持默认的表的分桶方式,不支持其他新的分桶方式,可修改bucket数量
Args:
table_name: table name
partition_name: new partition name
value: str or tuple, like: 'k1' or ('k1', 'k2'), (('1','2'),('3','4'))
distribute_type: only hash, do not support random
bucket_num: num
storage_medium:
storage_cooldown_time:
database_name:
Returns:
True/except
"""
database_name = self.database_name if database_name is None else database_name
predicate = 'IN' if partition_type is not None and partition_type.upper() == 'LIST' else 'LESS THAN'
if value != 'MAXVALUE':
# 单列list or 多列range
if isinstance(value, tuple) and isinstance(value[0], str):
value = '(%s)' % ','.join('"{0}"'.format(v) for v in value)
value = value.replace('"MAXVALUE"', 'MAXVALUE')
# 多列list
elif isinstance(value, tuple) and isinstance(value[0], tuple):
in_val_list = []
for multi_col_value in value:
in_val_list.append('(%s)' % ','.join('"{0}"'.format(v) for v in multi_col_value))
value = '(%s)' % ','.join(in_val_list)
# 单列range
else:
value = '("%s")' % (value)
sql = 'ALTER TABLE %s.%s ADD PARTITION %s VALUES %s %s' % (
database_name, table_name, partition_name, predicate, value)
sql = '%s (' % (sql)
if storage_medium is not None:
sql = '%s "storage_medium"="%s",' % (sql, storage_medium)
if storage_cooldown_time is not None:
sql = '%s "storage_cooldown_time"="%s",' % (sql, storage_cooldown_time)
if sql.endswith(' ('):
sql = sql.rstrip(' (')
else:
sql = '%s )' % (sql.rstrip(','))
if distribute_type:
if distribute_type.upper() == 'RANDOM':
discol = self.execute('desc %s.%s' % (database_name, table_name))[0][0]
sql = '%s DISTRIBUTED BY HASH(%s)' % (sql, discol)
else:
sql = '%s DISTRIBUTED BY %s' % (sql, distribute_type)
if bucket_num:
sql = '%s BUCKETS %d' % (sql, bucket_num)
result = self.execute(sql)
if result != ():
LOG.info(L("ADD PARTITION fail.", database_name=database_name,
table_name=table_name))
return False
return True
def modify_partition(self, table_name, partition_name=None,
storage_medium=None, storage_cooldown_time=None, replication_num=None,
database_name=None, **kwargs):
"""
修改分区的 storage_medium、storage_cooldown_time 和 replication_num 三个属性。
对于单分区表,partition_name 同表名
如果partition_name为none则为修改全表属性
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s' % (database_name, table_name)
if partition_name is not None:
if isinstance(partition_name, list):
partition_name = '(' + ','.join(partition_name) + ')'
sql = '%s MODIFY PARTITION %s' % (sql, partition_name)
if storage_medium is not None:
kwargs['storage_medium'] = storage_medium
if storage_cooldown_time is not None:
kwargs['storage_cooldown_time'] = storage_cooldown_time
if replication_num is not None:
kwargs['replication_num'] = replication_num
property = ''
for k, v in kwargs.items():
property += ' "%s" = "%s",' % (k, v)
property_str = property.strip(',')
sql = '%s SET(%s)' % (sql, property_str)
result = self.execute(sql)
if result != ():
LOG.info(L("MODIFY PARTITION fail.", database_name=database_name,
table_name=table_name))
return False
return True
def drop_partition(self, table_name, partition_name, database_name=None):
"""
删除分区
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s DROP PARTITION %s' % ( \
database_name, table_name, partition_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L('', fe=str(self), error=e))
return False
return True
def add_temp_partition(self, table_name, partition_name, value, distribute_type=None, bucket_num=None,
in_memory=None, replication_num=None, database_name=None, partition_type='RANGE'):
"""
新建临时分区
:param table_name:
:param partition_name:
:param value:
:param distribute_type:
:param bucket_num:
:param in_memory:
:param replication_num:
:param database_name:
:return:
"""
database_name = self.database_name if database_name is None else database_name
partitioninfo = PartitionInfo(partition_type=partition_type).get_partition_value(value)
print(partitioninfo)
sql = 'ALTER TABLE %s.%s ADD TEMPORARY PARTITION %s %s' % (database_name, table_name,
partition_name, partitioninfo)
sql = '%s (' % (sql)
if in_memory is not None:
sql = '%s "in_memory"="%s",' % (sql, in_memory)
if replication_num is not None:
sql = '%s "replication_num"="%s",' % (sql, replication_num)
if sql.endswith(' ('):
sql = sql.rstrip(' (')
else:
sql = '%s )' % (sql.rstrip(','))
if distribute_type:
if distribute_type.upper() == 'RANDOM':
discol = self.execute('desc %s.%s' % (database_name, table_name))[0][0]
sql = '%s DISTRIBUTED BY HASH(%s)' % (sql, discol)
else:
sql = '%s DISTRIBUTED BY %s' % (sql, distribute_type)
if bucket_num:
sql = '%s BUCKETS %d' % (sql, bucket_num)
result = self.execute(sql)
if result != ():
LOG.info(L("ADD TEMP PARTITION fail.", database_name=database_name,
table_name=table_name))
return False
return True
def drop_temp_partition(self, database_name, table_name, partition_name):
"""
删除临时分区
:param database_name:
:param table_name:
:param partition_name:
:return:
"""
self.use(database_name)
sql = 'ALTER TABLE %s DROP TEMPORARY PARTITION %s' % (table_name, partition_name)
result = self.execute(sql)
if result != ():
LOG.info(L("DROP TEMP PARTITION fail.", database_name=database_name,
table_name=table_name))
return False
return True
def modify_temp_partition(self, database_name, table_name, target_partition_list, temp_partition_list,
strict_range=None, use_temp_partition_name=None):
"""
修改临时分区
:param database_name:
:param table_name:
:param target_partition_list:
:param temp_partition_list:
:param strict_range:
:param use_temp_partition_name:
:return:
"""
self.use(database_name)
target_partition = ','.join(target_partition_list)
temp_partition = ','.join(temp_partition_list)
sql = 'ALTER TABLE %s.%s REPLACE PARTITION (%s) WITH TEMPORARY PARTITION (%s)' % \
(database_name, table_name, target_partition, temp_partition)
if strict_range is not None or use_temp_partition_name is not None:
sql = "%s PROPERTIES (" % sql
value_list = []
print(strict_range)
if strict_range:
value_list.append('"strict_range"="%s"' % strict_range)
if use_temp_partition_name:
value_list.append('"use_temp_partition_name"="%s"' % use_temp_partition_name)
print(value_list)
sql = "%s %s)" % (sql, ','.join(value_list))
result = self.execute(sql)
if result != ():
LOG.info(L("ALTER TEMP PARTITION fail.", database_name=database_name,
table_name=table_name))
return False
return True
def create_user(self, user, password=None, is_superuser=False, default_role=None):
"""
create user
"""
sql = "CREATE USER '%s'" % user
if password is not None:
sql = "%s IDENTIFIED BY '%s'" % (sql, password)
if is_superuser:
sql = "%s SUPERUSER" % sql
if default_role:
sql = "%s DEFAULT ROLE '%s'" % (sql, default_role)
result = self.execute(sql)
if result != ():
LOG.error(L("CREATE USER fail.", user=user, password=password, \
is_superuser=is_superuser))
return False
return True
def create_role(self, role_name):
"""
create role
"""
sql = 'CREATE ROLE %s' % role_name
result = self.execute(sql)
if result != ():
LOG.error(L("CREATE ROLE fail.", sql=sql, msg=str(result)))
return False
return True
def drop_role(self, role):
"""drop role"""
sql = "DROP ROLE %s" % role
result = self.execute(sql)
if result != ():
LOG.error(L("DROP ROLE fail", msg=str(result), role=role))
return False
return True
def drop_user(self, user, if_exists=True):
"""
drop user
"""
if if_exists:
sql = "DROP USER IF EXISTS '%s'" % user
else:
sql = "DROP USER '%s'" % user
result = self.execute(sql)
if result != ():
LOG.error(L("DROP USER fail.", user=user))
return False
return True
def clean_user(self, user):
"""clean user
"""
try:
self.drop_user(user)
except:
pass
def set_password(self, user=None, password=None):
"""
set password of user to password
"""
if user is None:
user = self.user
if password is None:
password = ''
sql = "SET PASSWORD FOR '%s' = PASSWORD('%s')" % (user, password)
result = self.execute(sql)
if result != ():
LOG.error(L("SET PASSWORD fail.", user=user, password=password))
return False
return True
def alter_user(self, user, cpu_share=None, max_user_connections=None):
"""
alter user
"""
sql = "ALTER USER '%s'" % user
if cpu_share is not None:
sql = "%s MODIFY RESOURCE CPU_SHARE %d," % (sql, cpu_share)
if max_user_connections is not None:
sql = "%s MODIFY PROPERTY MAX_USER_CONNECTIONS %d," % (sql, max_user_connections)
sql = sql.rstrip(",")
result = self.execute(sql)
if result != ():
LOG.error(L("ALTER USER fail.", user=user, cpu_share=cpu_share,
max_user_connections=max_user_connections))
return False
return True
def get_cpu_share(self, user):
"""
get cpu share
"""
result = self.show_resource(user)
for info in result:
if info[0] == user and info[1] == "CPU_SHARE":
return int(info[2])
return None
def show_resource(self, user):
"""
show resource
"""
sql = "SHOW RESOURCE LIKE '%s'" % user
result = self.execute(sql)
if result == ():
LOG.warning(L("SHOW RESOURCE fail.", user=user))
return result
def grant(self, user, privilege_list, database='', table='', catalog='',
resource=None, is_role=False, identity='%'):
"""
grant
GRANT privilege_list ON grant_obj TO [user_identity] | [ROLE role_name]
GRANT privilege_list ON RESOURCE grant_obj TO [user_identity] | [ROLE role_name]
"""
sql = "GRANT {privilege_list} ON {grant_obj} TO {role_desc}{user}"
if isinstance(privilege_list, list):
privilege_list = ', '.join(privilege_list)
if is_role:
role_desc = "ROLE "
user = '"%s"' % user
else:
role_desc = ''
user = '"%s"@"%s"' % (user, identity)
if resource:
grant_obj = 'RESOURCE "%s"' % resource
else:
grant_obj = "%s.%s.%s" % (catalog, database, table)
grant_obj = grant_obj.strip(".")
result = self.execute(sql.format(privilege_list=privilege_list, grant_obj=grant_obj,
role_desc=role_desc, user=user))
if result != ():
LOG.error(L("GRANT fail", user=user, privilege=privilege_list,
database_name=database))
return False
return True
def revoke(self, user, privilege_list, database='', table='', catalog='',
resource=None, is_role=False):
"""
revoke
REVOKE privilege_list ON db_name[.tbl_name] FROM [user_identity] | [ROLE role_name]
REVOKE privilege_list ON RESOURCE resource_name FROM [user_identity] | [ROLE role_name]
"""
if isinstance(privilege_list, list):
privilege_list = ', '.join(privilege_list)
sql = "REVOKE {privilege_list} ON {revoke_obj} FROM {role_desc}{user}"
if is_role:
role_desc = 'ROLE '
user = '"%s"' % user
else:
role_desc = ''
if resource is None:
revoke_obj = "%s.%s.%s" % (catalog, database, table)
revoke_obj = revoke_obj.strip('.')
else:
revoke_obj = 'RESOURCE %s' % resource
result = self.execute(sql.format(privilege_list=privilege_list, revoke_obj=revoke_obj,
role_desc=role_desc, user=user))
if result != ():
LOG.error(L("REVOKE fail", sql=sql, msg=str(result)))
return False
return True
def get_grant(self, user=None, all=None):
"""show grant"""
if user is None:
sql = 'SHOW GRANTS'
else:
sql = 'SHOW GRANTS FOR `%s`' % user
if all:
sql = 'SHOW ALL GRANTS'
result = self.__execute_and_rebuild_meta_class(sql, palo_job.GrantInfo)
return result
def is_master(self):
"""
is master
deprecated
"""
sql = "SHOW FRONTENDS"
fe_list = self.execute(sql)
for fe in fe_list:
if fe[palo_job.FrontendInfo.Host] == socket.gethostbyname(self.host):
return fe[palo_job.FrontendInfo.IsMaster] == "true"
return False
def get_alive_backend_list(self):
"""
获取alive backend
"""
backend_list = self.get_backend_list()
result = list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_alive() == "true":
result.append(backend)
LOG.info(L("GET ALIVE BACKEND", alive_be=result))
return result
def get_backend(self, backend_id):
"""
获取backend
"""
backend_list = self.get_backend_list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_backend_id() == str(backend_id):
return backend
LOG.warning(L("Get no backend by backend id", backend_id=backend_id))
return None
def get_backend_heartbeat_port(self, value=None, idx=None):
"""get be hearbeat port"""
be_list = self.get_backend_list()
if value is None:
port = palo_job.BackendProcInfo(be_list[0]).get_heartbeatport()
else:
port = util.get_attr_condition_value(be_list, idx, value,
palo_job.BackendProcInfo.HeartbeatPort)
return port
def get_backend_list(self):
"""
获取backend
"""
sql = "SHOW BACKENDS"
result = self.__execute_and_rebuild_meta_class(sql, palo_job.BackendProcInfo)
return result
def get_backend_id_list(self):
"""get backend id list"""
backend_list = self.get_backend_list()
id_list = list()
for be in backend_list:
id_list.append(be[0])
return id_list
def get_be_hostname_by_id(self, be_id):
"""
获取backend hostname by id
"""
be = self.get_backend(be_id)
if not be:
return None
return palo_job.BackendProcInfo(be).get_hostname()
def get_backend_host_list(self):
"""
返回活动状态的backend的ip
"""
backend_list = self.get_alive_backend_list()
be_host_list = [palo_job.BackendProcInfo(backend).get_ip() for backend in backend_list]
return tuple(be_host_list)
def get_backend_host_ip(self):
"""get all be host ip"""
res = []
backend_list = self.get_backend_list()
for backend in backend_list:
cur_ip = palo_job.BackendProcInfo(backend).get_ip()
res.append(cur_ip)
return res
def get_backend_host_name(self):
"""get all be host name"""
backend_list = self.get_backend_list()
return util.get_attr(backend_list, palo_job.BackendProcInfo.Host)
def get_backend_host_port_list(self):
"""
返回活动状态的backend的host:port
metadata changed
backends[3] is be hostname, backends[4] is be heartbeat port
"""
backend_list = self.get_alive_backend_list()
backend_host_port_list = []
for backend in backend_list:
backend_info = palo_job.BackendProcInfo(backend)
backend_host_port = '%s:%s' % (backend_info.get_hostname(),
backend_info.get_heartbeatport())
backend_host_port_list.append(backend_host_port)
return tuple(backend_host_port_list)
def add_backend_list(self, backend_list):
"""
增加backend
disable
"""
if not isinstance(backend_list, list):
backend_list = [backend_list, ]
for backend in backend_list:
sql = 'ALTER SYSTEM ADD BACKEND "%s"' % (backend)
result = self.execute(sql)
if result != ():
LOG.info(L('ADD BACKEND FAIL', backend=backend))
return False
return True
def add_backend(self, host_name, port, tag_location=None):
"""
增加backend
"""
sql = 'ALTER SYSTEM ADD BACKEND "%s:%s"' % (host_name, port)
if tag_location is not None:
sql = '%s PROPERTIES("tag.location"="%s")' % (sql, tag_location)
result = self.execute(sql)
time.sleep(2)
if result != ():
LOG.info(L('ADD BACKEND FAIL', backend=host_name, port=port, tag_location=tag_location))
return False
LOG.info(L('ADD BACKEND SUCCESS', backend=host_name, port=port, tag_location=tag_location))
return True
def drop_backend_list(self, backend_list):
"""
移除backend
"""
if not isinstance(backend_list, list):
backend_list = [backend_list, ]
for backend in backend_list:
sql = 'ALTER SYSTEM DROPP BACKEND "%s"' % (backend)
result = self.execute(sql)
if result != ():
LOG.info(L('DROP BACKEND FAIL', backend=backend))
return False
time.sleep(2)
return True
def decommission_backend_list(self, backend_list):
"""be下线"""
if not isinstance(backend_list, list):
backend_list = [backend_list, ]
for backend in backend_list:
sql = 'ALTER SYSTEM DECOMMISSION BACKEND "%s"' % (backend)
result = self.execute(sql)
if result != ():
LOG.info(L('DECOMMISSION BACKEND FAIL', backend=backend))
return False
return True
def add_fe_list(self, fe_list, type='OBSERVER'):
"""
增加FE
type: OBSERVER, REPLICA
"""
if not isinstance(fe_list, list):
fe_list = [fe_list, ]
for fe in fe_list:
sql = 'ALTER SYSTEM ADD %s "%s"' % (type, fe)
result = self.execute(sql)
if result != ():
LOG.info(L('ADD FE FAILED', fe=fe))
return False
return True
def drop_fe_list(self, fe_list, type='OBSERVER'):
"""
增加FE
type: OBSERVER, REPLICA
"""
if not isinstance(fe_list, list):
fe_list = [fe_list, ]
for fe in fe_list:
sql = 'ALTER SYSTEM DROP %s "%s"' % (type, fe)
result = self.execute(sql)
if result != ():
LOG.info(L('DROP FE FAILED', fe=fe))
return False
return True
def get_fe_list(self):
"""
获取FE list
"""
sql = "SHOW FRONTENDS"
result = self.execute(sql)
return result
def get_fe_host_port_list(self, type=None):
"""
返回fe的host:port
type: OBSERVER, REPLICA
"""
fe_list = self.get_fe_list()
fe_host_port_list = []
for fe in fe_list:
if type is not None and fe[2] != type:
continue
fe_host_port = '%s:%s' % (fe[0], fe[1])
fe_host_port_list.append(fe_host_port)
return tuple(fe_host_port_list)
def get_master(self):
"""
返回master的host:port
"""
fe_list = self.get_fe_list()
for fe in fe_list:
fe_info = palo_job.FrontendInfo(fe)
if fe_info.get_ismaster() == "true":
fe_host_port = "%s:%s" % (fe_info.get_host(), fe_info.get_httpport())
return fe_host_port
return None
def get_master_host(self):
"""
返回master的host:port
"""
fe_list = self.get_fe_list()
for fe in fe_list:
fe_info = palo_job.FrontendInfo(fe)
if fe_info.get_ismaster() == "true":
return fe_info.get_host()
return None
def get_fe_LastHeartbeat(self, fe_ip):
"""
返回fe_LastHeartbeat
"""
fe_list = self.get_fe_list()
for fe in fe_list:
if palo_job.FrontendInfo(fe).get_IP() == str(fe_ip):
return fe[palo_job.FrontendInfo(fe).get_LastHeartbeat()]
LOG.warning(L("Get no fe by fe id", fe_id=fe_ip))
return None
def get_fe_host(self):
"""get fe host list"""
fe_list = self.get_fe_list()
fe_host = list()
for fe in fe_list:
fe_host.append(fe[1])
return fe_host
def recover_database(self, database_name):
"""
恢复database
"""
sql = "RECOVER DATABASE %s" % database_name
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("recover database error", fe=str(self), database_name=database_name, \
error=e))
return False
return True
def recover_table(self, table_name, database_name=None):
"""
恢复database
"""
database_name = self.database_name if database_name is None else database_name
sql = "RECOVER TABLE "
if database_name is not None:
sql = '%s%s.' % (sql, database_name)
sql = '%s%s' % (sql, table_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("recover table error", fe=str(self), database_name=database_name, \
table_name=table_name, error=e))
return False
return True
def recover_partition(self, table_name, partition_name, database_name=None):
"""
恢复database
"""
database_name = self.database_name if database_name is None else database_name
sql = "RECOVER PARTITION %s FROM " % (partition_name)
if database_name is not None:
sql = '%s%s.' % (sql, database_name)
sql = '%s%s' % (sql, table_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("recover partition error", fe=str(self), database_name=database_name, \
table_name=table_name, partition_name=partition_name, error=e))
return False
return True
def rename_database(self, new_database_name, old_database_name=None):
"""
重命名数据库
"""
if old_database_name is None:
old_database_name = self.database_name
sql = 'ALTER DATABASE %s RENAME %s' % (old_database_name, new_database_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("rename database error", fe=str(self),
new_database_name=new_database_name, error=e))
return False
return True
def rename_table(self, new_table_name, old_table_name, database_name=None):
"""
重命名数据库
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s RENAME %s' % (database_name, old_table_name, new_table_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("rename table error", fe=str(self), error=e))
return False
return True
def rename_rollup(self, new_index_name, old_index_name, table_name, database_name=None):
"""
重命名数据库
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s RENAME ROLLUP %s %s' % (database_name, table_name,
old_index_name, new_index_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("rename rollup error", fe=str(self), error=e))
return False
return True
def rename_partition(self, new_partition_name, old_partition_name,
table_name, database_name=None):
"""
重命名数据库
"""
database_name = self.database_name if database_name is None else database_name
sql = 'ALTER TABLE %s.%s RENAME PARTITION %s %s' % (database_name, table_name,
old_partition_name, new_partition_name)
try:
self.execute(sql)
except PaloException as e:
LOG.error(L("rename partition error", fe=str(self), error=e))
return False
return True
def show_databases(self, database_name=None):
"""show databases
"""
sql = 'SHOW DATABASES'
if database_name:
sql = 'SHOW DATABASES LIKE "%s"' % database_name
return self.execute(sql)
def show_tables(self, table_name=None):
"""show tables
"""
sql = 'SHOW TABLES'
if table_name:
sql = 'SHOW TABLES LIKE "%s"' % table_name
return self.execute(sql)
def show_partitions(self, table_name, database_name=None):
"""show partitions
"""
database_name = self.database_name if database_name is None else database_name
sql = 'SHOW PARTITIONS FROM %s.%s' % (database_name, table_name)
result = self.__execute_and_rebuild_meta_class(sql, palo_job.PartitionInfo)
return self.execute(sql)
def show_loading_job_state(self, database_name=None, state='LOADING'):
"""show loading state job"""
ret = self.show_load(database_name=database_name, state=state)
return ret
def show_load(self, database_name=None, label=None, state=None, order_by=None, limit=None,
offset=None):
"""
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];
Returns:
load_job
"""
sql = 'SHOW LOAD'
if database_name:
sql = '%s FROM %s' % (sql, database_name)
if label:
sql = '%s WHERE label = "%s"' % (sql, label)
if state:
sql = '%s WHERE STATE="%s"' % (sql, state)
if order_by:
sql = '%s ORDER BY %s' % (sql, order_by)
if limit:
sql = '%s LIMIT %s' % (sql, limit)
if offset:
sql = '%s OFFSET %s' % (sql, offset)
return self.execute(sql)
def desc_table(self, table_name, database_name=None, is_all=False):
"""desc table"""
database_name = self.database_name if database_name is None else database_name
if is_all:
sql = 'DESC %s.%s all' % (database_name, table_name)
else:
sql = 'DESC %s.%s' % (database_name, table_name)
return self.execute(sql)
def show_schema_change_job(self, database_name=None, table_name=None, state=None):
"""show schema change job"""
database_name = self.database_name if database_name is None else database_name
where_list = []
if table_name:
where_list.append('TableName = "%s"' % table_name)
if state:
where_list.append('State= "%s"' % state)
if len(where_list) == 0:
sql = 'SHOW ALTER TABLE COLUMN FROM %s' % database_name
return self.execute(sql)
else:
sql = 'SHOW ALTER TABLE COLUMN FROM %s WHERE %s' % (database_name, ' AND '.join(where_list))
return self.execute(sql)
def show_rollup_job(self, database_name=None, table_name=None):
"""show schema change job"""
database_name = self.database_name if database_name is None else database_name
sql = 'SHOW ALTER TABLE ROLLUP FROM %s' % database_name
if table_name is None:
return self.execute(sql)
else:
sql = '%s WHERE TableName = "%s"' % (sql, table_name)
sc_job = self.execute(sql)
return sc_job
def set_properties(self, kv_list, user=None):
"""set properties
"""
for_user = ''
if user is not None:
for_user = 'FOR "%s" ' % user
properties = ', '.join(kv_list) if not isinstance(kv_list, str) else kv_list
sql = 'SET PROPERTY %s%s' % (for_user, properties)
result = self.execute(sql)
if result != ():
return False
return True
def set_max_user_connections(self, max_connections, user=None):
"""set max user connections
"""
kv = '"max_user_connections" = "%s"' % max_connections
return self.set_properties(kv, user)
def set_resource_cpu_share(self, cpu_share, user=None):
"""set resource cpu share
"""
kv = '"resource.cpu_share" = "%s"' % cpu_share
return self.set_properties(kv, user)
def set_quota_low(self, quota_low, user=None):
"""set quota low
"""
kv = '"quota.low" = "%s"' % quota_low
return self.set_properties(kv, user)
def set_quota_normal(self, quota_normal, user=None):
"""set quota normal
"""
kv = '"quota.normal" = "%s"' % quota_normal
return self.set_properties(kv, user)
def set_quota_high(self, quota_high, user=None):
"""set quota high
"""
kv = '"quota.high" = "%s"' % quota_high
return self.set_properties(kv, user)
def set_load_cluster_hadoop_palo_path(self, cluster_name, hadoop_palo_path, user=None):
"""set load cluster hadoop palo path
"""
kv = '"load_cluster.%s.hadoop_palo_path" = "%s"' % (cluster_name, hadoop_palo_path)
return self.set_properties(kv, user)
def set_load_cluster_hadoop_http_port(self, cluster_name, hadoop_http_port, user=None):
"""set load cluster hadoop http port
"""
kv = '"load_cluster.%s.hadoop_http_port" = "%s"' % (cluster_name, hadoop_http_port)
return self.set_properties(kv, user)
def set_load_cluster_hadoop_configs(self, cluster_name, hadoop_configs, user=None):
"""set load cluster hadoop configs
"""
kv = '"load_cluster.%s.hadoop_configs" = "%s"' % (cluster_name, hadoop_configs)
return self.set_properties(kv, user)
def set_load_cluster(self, cluster_name, hadoop_configs, hadoop_palo_path,
hadoop_http_port=None, user=None):
"""set load cluster
"""
kv_1 = '"load_cluster.%s.hadoop_configs" = "%s"' % (cluster_name, hadoop_configs)
kv_2 = '"load_cluster.%s.hadoop_palo_path" = "%s"' % (cluster_name, hadoop_palo_path)
kv_list = [kv_1, kv_2]
if hadoop_http_port is not None:
kv_3 = '"load_cluster.%s.hadoop_http_port" = "%s"' % (cluster_name, hadoop_http_port)
kv_list.append(kv_3)
return self.set_properties(kv_list, user)
def set_default_load_cluster(self, cluster_name, user=None):
"""set default load cluster
"""
kv = '"default_load_cluster" = "%s"' % cluster_name
return self.set_properties(kv, user)
def remove_default_load_cluster(self, user=None):
"""remove default load cluster
"""
kv = '"default_load_cluster" = ""'
return self.set_properties(kv, user)
def remove_load_cluster(self, cluster_name, user=None):
"""remove load cluster hadoop configs
"""
kv = '"load_cluster.%s" = ""' % cluster_name
return self.set_properties(kv, user)
def remove_load_cluster_hadoop_configs(self, cluster_name, user=None):
"""remove load cluster hadoop configs
"""
kv = '"load_cluster.%s.hadoop_configs" = ""' % cluster_name
return self.set_properties(kv, user)
def remove_load_cluster_hadoop_http_port(self, cluster_name, user=None):
"""remove load cluster hadoop http port
"""
kv = '"load_cluster.%s.hadoop_http_port" = ""' % cluster_name
return self.set_properties(kv, user)
def remove_load_cluster_hadoop_palo_path(self, cluster_name, user=None):
"""remove load cluster hadoop palo path
"""
kv = '"load_cluster.%s.hadoop_palo_path" = ""' % cluster_name
return self.set_properties(kv, user)
def gen_hadoop_configs(self, fs_default_name=None, mapred_job_tracker=None,
hadoop_job_ugi=None, mapred_job_priority=None):
"""gen hadoop configs
"""
configs = ''
if fs_default_name is not None:
configs += 'fs.default.name=%s;' % (fs_default_name)
if mapred_job_tracker is not None:
configs += 'mapred.job.tracker=%s;' % (mapred_job_tracker)
if hadoop_job_ugi is not None:
configs += 'hadoop.job.ugi=%s;' % (hadoop_job_ugi)
if mapred_job_priority is not None:
configs += 'mapred.job.priority=%s;' % (mapred_job_priority)
configs = configs.rstrip(';')
return configs
def show_property(self, key=None, user=None):
"""show property
"""
for_user = '' if user is None else ' FOR "%s"' % user
property_key = '' if key is None else ' LIKE "%s" ' % key
sql = 'SHOW PROPERTY%s%s' % (for_user, property_key)
return self.execute(sql)
def show_max_user_connections(self, user=None):
"""show max user connections
((u'max_user_connections', u'10'),)
"""
result = self.show_property('max_user_connections', user)
return int(result[0][1])
def show_resource_cpu_share(self, user=None):
"""show resource cpu share
((u'resource.cpu_share', u'1000'),
(u'resource.hdd_read_iops', u'80'),
(u'resource.hdd_read_mbps', u'30'),
(u'resource.io_share', u'1000'),
(u'resource.ssd_read_iops', u'1000'),
(u'resource.ssd_read_mbps', u'30'))
"""
result = self.show_property('resource', user)
for item in result:
if item[0] == 'resource.cpu_share':
return int(item[1])
def show_quota_low(self, user=None):
"""show quota low
"""
result = self.show_property('quota', user)
for item in result:
if item[0] == 'quota.low':
return int(item[1])
def show_quota_normal(self, user=None):
"""show quota normal
"""
result = self.show_property('quota', user)
for item in result:
if item[0] == 'quota.normal':
return int(item[1])
def show_quota_high(self, user=None):
"""show quota high
"""
result = self.show_property('quota', user)
for item in result:
if item[0] == 'quota.high':
return int(item[1])
def show_load_cluster(self, user=None):
"""show load cluster
"""
result = self.show_property('%load_cluster%', user)
load_cluster_dict = {}
for k, v in result:
load_cluster_dict[k] = v
return load_cluster_dict
def add_whitelist(self, user, whitelist):
"""
add whitelist
"""
sql = 'ALTER USER %s ADD WHITELIST "%s"' % (user, whitelist)
return self.execute(sql) == ()
def delete_whitelist(self, user, whitelist):
"""
delete whitelist
"""
sql = 'ALTER USER %s DELETE WHITELIST "%s"' % (user, whitelist)
return self.execute(sql) == ()
def show_whitelist(self, user):
"""
show whitelist
"""
sql = 'SHOW WHITELIST'
result = self.execute(sql)
for r in result:
if r[0] == user:
return r[1]
return None
def clean_whitelist(self, user):
"""
clean whitelist
"""
whitelist = self.show_whitelist(user)
if whitelist is None:
return
for w in whitelist.split(','):
if w:
self.delete_whitelist(user, w)
def create_cluster(self, cluster_name, instance_num, password=''):
"""
create cluster
"""
sql = 'CREATE CLUSTER %s PROPERTIES ( "instance_num" = "%d") ' \
'IDENTIFIED BY "%s"' % (cluster_name, instance_num, password)
self.execute(sql)
def enter(self, cluster_name):
"""
create cluster
"""
sql = 'ENTER %s' % (cluster_name)
self.execute(sql)
self.cluster_name = cluster_name
def clean_cluster(self, cluster_name=None):
"""
clean cluster
"""
if not cluster_name:
cluster_name_list = self.get_cluster_list()
for cluster_name in cluster_name_list:
if cluster_name == 'default_cluster':
continue
self.enter(cluster_name)
self.clean()
self.drop_cluster(cluster_name)
else:
try:
self.enter(cluster_name)
self.clean()
self.drop_cluster(cluster_name)
except:
pass
def drop_cluster(self, cluster_name):
"""
drop cluster
"""
sql = 'DROP CLUSTER %s' % (cluster_name)
self.execute(sql)
def get_cluster_list(self):
"""
show cluster
"""
sql = 'SHOW CLUSTERS'
records = self.execute(sql)
return tuple([r[0] for r in records])
def alter_cluster(self, cluster_name, instance_num):
"""
alter cluster
"""
sql = 'ALTER CLUSTER %s PROPERTIES ( "instance_num" = "%d" )' \
% (cluster_name, instance_num)
self.execute(sql)
def link(self, src_cluster_name, src_database_name, dst_cluster_name, dst_database_name):
"""
link
"""
sql = 'LINK DATABASE %s.%s %s.%s' % (src_cluster_name,
src_database_name, dst_cluster_name, dst_database_name)
self.execute(sql)
def migrate(self, src_cluster_name, src_database_name, dst_cluster_name, dst_database_name):
"""
migrate
"""
sql = 'MIGRATE DATABASE %s.%s %s.%s' % (src_cluster_name,
src_database_name, dst_cluster_name, dst_database_name)
self.execute(sql)
def get_migrate_status(self, src_cluster_name, src_database_name,
dst_cluster_name, dst_database_name):
"""
获取指定状态的导入任务信息
"""
sql = 'SHOW MIGRATIONS'
migrate_list = self.execute(sql)
print(migrate_list)
for it in migrate_list:
if it[1] == '%s:%s' % (src_cluster_name, src_database_name) \
and it[2] == '%s:%s' % (dst_cluster_name, dst_database_name):
return it[3]
return None
def wait_migrate(self, src_cluster_name, src_database_name,
dst_cluster_name, dst_database_name):
"""
wait migrate
"""
time.sleep(5)
ret = self.get_migrate_status(src_cluster_name, src_database_name,
dst_cluster_name, dst_database_name)
if not ret:
return False
while True:
ret = self.get_migrate_status(src_cluster_name, src_database_name,
dst_cluster_name, dst_database_name)
if ret == '100%':
return True
time.sleep(1)
def add_broker(self, broker_name, host_port):
"""
add broker
"""
sql = 'ALTER SYSTEM ADD BROKER %s "%s"' % (broker_name, host_port)
return self.execute(sql) == ()
def drop_broker(self, broker_name, host_port):
"""
add broker
"""
sql = 'ALTER SYSTEM DROP BROKER %s "%s"' % (broker_name, host_port)
return self.execute(sql) == ()
def get_broker_list(self):
"""
show whitelist
"""
sql = 'SHOW PROC "/brokers"'
result = self.execute(sql)
return result
def get_broker_start_update_time(self):
"""show broker_start_update_time"""
res = self.get_broker_list()
if res:
LOG.info(L("get_broker_start_update_time", start_time=palo_job.BrokerInfo(res[0]).get_last_start_time(),
update_time=palo_job.BrokerInfo(res[0]).get_last_update_time()))
return palo_job.BrokerInfo(res[0]).get_last_start_time()
return None
def drop_all_broker(self, broker_name):
"""
add broker
"""
sql = 'ALTER SYSTEM DROP ALL BROKER %s"' % broker_name
return self.execute(sql) == ()
def export(self, table_name, to_path, broker_info=None,
partition_name_list=None, property_dict=None,
database_name=None, where=None):
"""export data.
Args:
table_name: table name, str
to_path: export path, str
broker_info: broker info, BrokerInfo
partition_name_list: name list of patitions, str list
property_dict: properties, dict str->str
database_name: database name, str
where: str, k1>0
Returns:
True if succeed
"""
sql = 'EXPORT TABLE'
if not database_name:
database_name = self.database_name
sql = '%s %s.%s' % (sql, database_name, table_name)
if partition_name_list:
sql = '%s PARTITION (%s)' % (sql, ','.join(partition_name_list))
if where:
sql = '%s WHERE %s' % (sql, where)
sql = '%s TO "%s"' % (sql, to_path)
if property_dict:
p = ''
for d in property_dict:
p = '%s "%s"="%s",' % (p, d, property_dict[d])
p = p.rstrip(",")
sql = '%s PROPERTIES(%s)' % (sql, p)
if broker_info:
sql = '%s %s' % (sql, str(broker_info))
return self.execute(sql) == ()
def get_export_status(self, database_name=None):
"""get export status.
Args:
database_name: database name, str
Returns:
True if succeed
"""
sql = 'SHOW EXPORT'
if database_name:
sql = '%s FROM %s' % (sql, database_name)
sql = '%s %s' % (sql, 'ORDER BY JOBID LIMIT 1')
return self.execute(sql)
def show_export(self, database_name=None, state=None, export_job_id=None,
order_by=None, limit=None):
"""
Args:
database_name: database name
state: export status filter, PENDING|EXPORTIING|FINISHED|CANCELLED
export_job_id: export job id filter
order_by: column, str 'starttime'
limit: num, int
Returns:
export job
"""
sql = 'SHOW EXPORT'
if database_name:
sql = '%s FROM %s' % (sql, database_name)
if state:
sql = '%s WHERE STATE="%s"' % (sql, state)
if export_job_id:
sql = '%s AND ID="%s"' % (sql, export_job_id)
elif export_job_id:
sql = '%s WHERE ID="%s"' % (sql, export_job_id)
if order_by:
sql = '%s ORDER BY %s' % (sql, order_by)
if limit:
sql = '%s LIMIT %s' % (sql, limit)
return self.execute(sql)
def wait_export(self, database_name=None):
"""wait export.
Args:
database_name: database name, str
Returns:
True if succeed
"""
timeout = 1200
while timeout > 0:
r = self.get_export_status(database_name)
export_job = palo_job.ExportJob(r[0])
if export_job.get_state() == 'FINISHED':
LOG.info(L("export succ.", state='FINISHED', database_name=database_name))
return True
elif export_job.get_state() == 'CANCELLED':
LOG.warning(L("export fail.", state='CANCELLED', database_name=database_name,
msg=export_job.get_error_msg()))
return False
else:
time.sleep(1)
timeout -= 1
LOG.warning(L("export timeout.", timeout=timeout, database_name=database_name))
return False
def create_external_table(self, table_name, column_list, engine, property,
broker_property=None, database_name=None, set_null=False):
"""Create broker table.
Args:
table_name: table name, str
column_list: list of columns
engine: str, it should be olap, mysql, elasticsearch, broker
property: map,
if broker engine e.g.:
PROPERTIES (
"broker_name" = "broker_name",
"paths" = "file_path1[,file_path2]",
"column_separator" = "value_separator"
"line_delimiter" = "value_delimiter"
|"format" = "parquet"
)
mysql engine e.g.:
PROPERTIES (
"host" = "mysql_server_host",
"port" = "mysql_server_port",
"user" = "your_user_name",
"password" = "your_password",
"database" = "database_name",
"table" = "table_name"
)
es engine e.g.:
PROPERTIES (
"hosts" = "http://",
"user" = "root",
"password" = "",
"index" = "new_data", "type" = "mbtable" )
broker_property: broker property, broker maybe hdfs, bos, afs.
set_null: if column can be null
Returns:
True if succeed
"""
database_name = self.database_name if database_name is None else database_name
# table name
sql = 'CREATE EXTERNAL TABLE %s.%s (' % (database_name, table_name)
# columns
for column in column_list:
sql = '%s %s,' % (sql, util.column_to_no_agg_sql(column, set_null))
sql = '%s ) ENGINE=%s' % (sql.rstrip(','), engine)
# property
sql = '%s PROPERTIES %s' % (sql, util.convert_dict2property(property))
# broker_property
if isinstance(broker_property, BrokerInfo):
sql = '%s BROKER PROPERTIES %s' % (sql, broker_property.get_property())
elif isinstance(broker_property, str):
sql = '%s BROKER PROPERTIES (%s)' % (sql, broker_property)
elif isinstance(broker_property, dict):
sql = '%s BROKER PROPERTIES %s' % (sql, util.convert_dict2property(broker_property))
else:
pass
ret = self.execute(sql)
if ret != ():
LOG.info(L('CREATE EXTERNAL TABLE fail.', database_name=database_name,
table_name=table_name))
return False
LOG.info(L('CREATE EXTERNAL TABLE succ.', database_name=database_name,
table_name=table_name))
return True
def stream_load(self, table_name, data_file, database_name=None, host=None, port=None,
user=None, password=None, cluster_name=None, is_wait=True, max_filter_ratio=None,
load_label=None, column_name_list=None, timeout=300, column_separator=None,
partition_list=None, where_filter=None, time_zone=None, **kwargs):
"""
Args:
table_name: 必填,表名
data_file: 必填,上传数据
user: 可选,palo的用户
password: 可选, palo用户的密码
database_name: 可选, 数据库名,默认为当前client的数据库
cluster_name: 可选, cluster,默认为default_cluster.不建议使用
host: 可选,默认为当前client连接的fe
port: 可选,默认为当前client的http_port
-H参数包括:
max_filter_ratio: 可选, 最大容忍可过滤的数据比例0-1
load_label/label: 可选,导入的标签
column_name_list/columns: 可选, 指定导入文件中的列和table中的列的对应关系
timeout: 可选,连接的超时时间
column_separator: 可选, 列的分割符
partition_list/partitions: 可选, 指定本次导入的分区
where_filter/where: 可选,用户抽取部分数据
time_zone/timezone: 可选,用户设置时区
strict_mode: 用户指定此次导入是否开启严格模式,默认为false
exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节
format: 指定导入数据格式,默认是csv,支持json格式
jsonpaths: 导入json方式分为:简单模式和精准模式
strip_outer_array: 为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false
json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""
控制参数:
is_wait: 当返回结果为publish timeout时,是否等待事务visible
Returns:
0 fail/label already exist
1 success
2 publish timeout
"""
database_name = database_name if database_name is not None else self.database_name
host = host if host is not None else self.host
port = port if port is not None else self.http_port
user = user if user is not None else self.user
password = password if password is not None else self.password
uri = "http://%s:%s/api/%s/%s/_stream_load" % (host, port, database_name, table_name)
buf = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, uri)
c.setopt(c.WRITEFUNCTION, buf.write)
head = dict()
head['Content-length'] = os.path.getsize(data_file)
head['Transfer-Encoding'] = ''
head['Expect'] = '100-continue'
file = open(data_file, "rb")
if load_label:
head['label'] = load_label
if column_name_list:
head['columns'] = ','.join(column_name_list)
if column_separator:
head['column_separator'] = column_separator
if max_filter_ratio:
head['max_filter_ratio'] = max_filter_ratio
if partition_list:
head['partitions'] = ','.join(partition_list)
if where_filter:
head['where'] = where_filter
if time_zone:
head['timezone'] = time_zone
if timeout:
head['timeout'] = timeout
print(data_file)
head.update(kwargs)
print(head)
param = ''
for k, v in head.items():
param = '%s -H "%s:%s"' % (param, k, v)
curl_cmd = 'curl --location-trusted -u %s:%s %s -T %s %s' % (user, password, param, data_file, uri)
LOG.info(L('STREAM LOAD CURL CMD.', cmd=curl_cmd))
print(curl_cmd)
# 设置-H参数
c.setopt(pycurl.HTTPHEADER, [k + ': ' + str(v) for k, v in head.items()])
# basic认证
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
c.setopt(pycurl.USERNAME, user)
c.setopt(pycurl.PASSWORD, password)
# 上传文件
c.setopt(pycurl.PUT, 1)
c.setopt(pycurl.UPLOAD, 1)
c.setopt(pycurl.READDATA, file)
# 重定向,--location-trusted
c.setopt(pycurl.UNRESTRICTED_AUTH, 1)
c.setopt(pycurl.FOLLOWLOCATION, True)
# 连接超时
c.setopt(pycurl.TIMEOUT, 30)
LOG.info(L('STREAM LOAD.', head=head, url=uri, data_file=data_file))
try:
c.perform()
msg = buf.getvalue()
print(msg)
LOG.info(L('STREAM LOAD ret.', ret=msg))
if c.getinfo(c.HTTP_CODE) == 200:
ret = json.loads(msg)
status = ret.get("Status")
txn_id = ret.get("TxnId")
if status == 'Success':
stream_load_ret = 1
self.wait_txn(txn_id, database_name)
elif status == 'Publish Timeout':
stream_load_ret = 2
if is_wait:
self.wait_txn(txn_id, database_name)
else:
# Label Already Exists/Fail
url = ret.get("ErrorURL")
if url:
sql = 'show load warnings on "%s"' % url
ret = self.execute(sql)
print(ret[0])
stream_load_ret = 0
else:
stream_load_ret = 0
except Exception as e:
stream_load_ret = 0
print(str(e))
LOG.info(L('STREAM LOAD FAILED.', msg=str(e)))
finally:
buf.close()
file.close()
c.close()
return stream_load_ret
def insert_select(self, table_name, select_query, column_name_list=None, is_streaming=True, database_name=None):
"""insert"""
if database_name is not None:
sql = 'INSERT INTO %s.%s' % (database_name, table_name)
else:
sql = 'INSERT INTO %s' % table_name
if column_name_list:
sql = '%s (%s)' % (sql, ','.join(column_name_list))
sql = '%s %s' % (sql, select_query)
ret = self.execute(sql)
return ret == ()
def get_column(self, column_name, table_name, database_name=None):
"""get column"""
column_tuple = self.desc_table(table_name, database_name)
for column in column_tuple:
if column_name == palo_job.DescInfo(column).get_field():
return column_name
return None
def get_column_info(self, column_name, table_name, database_name=None):
"""get column info"""
column_tuple = self.desc_table(table_name, database_name)
for column in column_tuple:
if column_name == palo_job.DescInfo(column).get_field():
return column
return None
def get_all_columns(self, table_name, database_name=None):
"""get table columns"""
column_tuple = self.desc_table(table_name, database_name)
column_name_list = list()
for column in column_tuple:
column_name = palo_job.DescInfo(column).get_field()
column_name_list.append(column_name)
return column_name_list
def create_repository(self, repo_name, broker_name, repo_location, repo_properties,
is_read_only=False):
"""
Args:
repo_name: string, repository name
broker_name: string, broker name
repo_location: string, repository location path
repo_properties: dict, property of repository location
"bos_endpoint" = "http://gz.bcebos.com",
"bos_accesskey" = "XXXXXXXXXXXXXXXXXXXX",
"bos_secret_accesskey"="XXXXXXXXXXXXXXXXX"
properties = dict()
properties["bos_endpoint"] = "http://gz.bcebos.com"
properties["bos_accesskey"] = "XXXXXXXXXXXXXXXXXXX"
properties["bos_secret_accesskey"] = "XXXXXXXXXXXXXXXXXXX"
Returns:
True create repo success
False create repo failed
need to check
"""
sql = 'CREATE {readonly} REPOSITORY {repo_name} WITH BROKER {broker_name} \
ON LOCATION "{repo_location}" PROPERTIES ({repo_properties})'
if isinstance(repo_properties, dict):
property = ''
for k, v in repo_properties.items():
property += ' "%s" = "%s",' % (k, v)
p = property.strip(',')
elif isinstance(repo_properties, str):
p = repo_properties
else:
return False
if is_read_only:
readonly = 'READ ONLY'
else:
readonly = ''
s = sql.format(readonly=readonly, repo_name=repo_name, broker_name=broker_name,
repo_location=repo_location, repo_properties=p)
try:
ret = self.execute(s)
except Exception as e:
logging.exception(e)
LOG.info(L('CREATE REPO fail.', repository_name=repo_name, msg=str(e)))
return False
if ret == ():
LOG.info(L('CREATE REPO succ.', repository_name=repo_name))
return True
else:
LOG.info(L('CREATE REPO fail.', repository_name=repo_name))
return False
def drop_repository(self, repo_name):
"""
Args:
repo_name: string, repository name to be dropped
Returns:
True drop repo success
False drop repo failure
"""
sql = 'DROP REPOSITORY {repo_name}'
s = sql.format(repo_name=repo_name)
try:
ret = self.execute(s)
except Exception as e:
logging.exception(e)
LOG.info(L('DROP REPO fail.', repository_name=repo_name, msg=str(e)))
return False
if ret == ():
LOG.info(L('DROP REPO succ.', repository_name=repo_name))
return True
else:
LOG.info(L('DROP REPO fail.', repository_name=repo_name))
return False
def show_repository(self):
"""
Returns: existed repo
RepoId: 唯一的仓库ID
RepoName: 仓库名称
CreateTime: 第一次创建该仓库的时间
IsReadOnly: 是否为只读仓库
Location: 仓库中用于备份数据的根目录
Broker: 依赖的 Broker
ErrMsg: Palo 会定期检查仓库的连通性,如果出现问题,这里会显示错误信息
"""
sql = 'SHOW REPOSITORIES'
ret = self.execute(sql)
return ret
def get_repository(self, repo_name=None, repo_info=False):
"""
Args:
repo_name: string, repo name
Returns:
if repo_name exists return
(RepoId, RepoName, CreateTime, IsReadOnly, Location, Broker, ErrMsg)
else return None
"""
r = self.show_repository()
if repo_name is None:
return util.get_attr(r, palo_job.RepoInfo.RepoName)
for repo in r:
if repo_name == repo[palo_job.RepoInfo.RepoName]:
if not repo_info:
return repo
else:
return palo_job.RepoInfo(repo)
return None
def backup(self, snapshot_label, backup_list, repo_name, database_name=None, type=None,
timeout=1800, is_wait=False):
"""
Args:
snapshot_label: string, snapshot name
backup_list: list, table/partition to backup, ['table', 'table2 PARTITION (p1, p2)']
repo_name: string, repo name
database_name: string, db name, if none, use default db
type: string, only support full, full backup
timeout: int, timeout
is_wait: False/True, if wait backup job finish
Returns:
True backup success
False backup failure
"""
sql = 'BACKUP SNAPSHOT {db}.{snapshot_label} TO {repo} on ({backup}) {property}'
b = ','.join(backup_list)
property = 'PROPERTIES({property})'
database_name = self.database_name if database_name is None else database_name
if type is None and timeout is None:
property = ''
else:
p = list()
if type and type.upper() == 'FULL':
p.append('"type"="full"')
if timeout:
p.append('"timeout"="%s"' % timeout)
property = property.format(property='e,'.join(p))
s = sql.format(db=database_name, snapshot_label=snapshot_label, repo=repo_name, backup=b,
property=property)
try:
ret = self.execute(s)
except Exception as e:
logging.exception(e)
LOG.info(L('BACKUP SNAPSHOT FAILED.', snapshot_label=snapshot_label, msg=str(e)))
msg = 'Can only run one backup or restore job of a database at same time'
if msg in str(e):
ret = self.show_backup(database_name)
LOG.info(L("Currrent backup jobs", ret=ret))
ret = self.show_restore(database_name)
LOG.info(L('Currrent restore job', ret=ret))
return False
if ret != ():
LOG.info(L('BACKUP SNAPSHOT FAILED.', snapshot_label=snapshot_label))
return False
if is_wait is False:
LOG.info(L('BACKUP SNAPSHOT SUCCEEDED.', snapshot_label=snapshot_label))
return True
else:
ret = self.wait_backup_job(snapshot_label)
return ret
def wait_backup_job(self, label=None, database_name=None):
"""
Args:
label: if label is None, wait all backup job finish, else wait label job finish(backup job can not run at the same time in a db)
Returns:
True if backup job is Finished state
False if backup job is Cancelled state
"""
ret = self.show_backup(database_name)
if ret == ():
return True
flag = False
while label is None and flag is False:
flag = True
for backup_job in ret:
s = backup_job[palo_job.BackupJob.State]
if s != 'FINISHED' and s != 'CANCELLED':
flag = False
time.sleep(1)
ret = self.show_backup(database_name)
if flag is True:
LOG.info(L('BACKUP JOB ALL FINISHED, NO JOB RUNNING.'))
return True
timeout = 3600
while timeout > 0:
for backup_job in ret:
if label == backup_job[palo_job.BackupJob.SnapshotName]:
s = backup_job[palo_job.BackupJob.State]
if s == 'FINISHED':
LOG.info(L('BACKUP SNAPSHOT FINISEHD.', snapshot_label=label))
return True
elif s == 'CANCELLED':
LOG.info(L('BACKUP SNAPSHOT CANCELLED.', snapshot_label=label,
msg=backup_job[palo_job.BackupJob.Status]))
return False
time.sleep(3)
ret = self.show_backup(database_name)
timeout -= 1
LOG.info(L('BACKUP JOB WAIT TIMEOUT.', snapshot_label=label))
return False
def show_backup(self, database_name=None):
"""
Args:
database_name: string, database name
Returns:
((), (),...)
"""
if database_name is None:
sql = 'SHOW BACKUP'
else:
sql = 'SHOW BACKUP FROM {db}'.format(db=database_name)
ret = self.__execute_and_rebuild_meta_class(sql, palo_job.BackupJob)
return ret
def cancel_backup(self, database_name=None):
"""
Args:
database_name: string, database name
Returns:
True if cancel backup job success
False if cancel backup job failure
"""
database_name = self.database_name if database_name is None else database_name
sql = 'CANCEL BACKUP FROM {db}'.format(db=database_name)
ret = self.execute(sql)
if ret == ():
LOG.info(L('CANCEL BACKUP SUCCESS.', database_name=database_name))
return True
else:
LOG.info(L('CANCEL BACKUP FAILURE.', database_name=database_name))
return False
def restore(self, snapshot_name, repo_name, restore_list, database_name=None,
replication_num=None, timeout=1800, is_wait=False):
"""
Args:
snapshot_name: string, snapshot name
repo_name: string, repo name
restore_list: list, table and partition and rename, ['table1', 'table2 PARTITION (p1, p2)', 'table3 PARTITION (p1, p2) AS table_rename']
database_name: string, database name
replication_num: int, replication number
timeout: int, timeout
is_wait: True/False, if wait restore job finished
Returns:
True if restore succeeded
False if restore failed
"""
sql = 'RESTORE SNAPSHOT {db}.{snapshot_name} from {repo_name} on ({restore_list}) ' \
'PROPERTIES("backup_timestamp"="{timestamp}"{property})'
database_name = self.database_name if database_name is None else database_name
r = ','.join(restore_list)
p = ''
if replication_num:
p = '%s, "replication_num"="%d"' % (p, replication_num)
if timeout:
p = '%s, "timeout"="%d"' % (p, timeout)
backup_timestamp = self.__get_backup_timestamp(repo_name, snapshot_name)
if not backup_timestamp:
LOG.info(L('get timestamp error when restore.', snapshot=snapshot_name, repo=repo_name))
return False
s = sql.format(db=database_name, snapshot_name=snapshot_name, repo_name=repo_name,
restore_list=r, timestamp=backup_timestamp, property=p)
try:
ret = self.execute(s)
except Exception as e:
logging.exception(e)
LOG.info(L('RESTORE SNAPSHOT FAILED.', snapshot_name=snapshot_name, msg=str(e)))
msg = 'Can only run one backup or restore job of a database at same time'
if msg in str(e):
ret = self.show_restore(database_name)
LOG.info(L('Currrent restore job', ret=ret))
ret = self.show_backup(database_name)
LOG.info(L('Currrent backup job', ret=ret))
return False
if ret != ():
LOG.info(L('RESTORE SNAPSHOT FAILED.', snapshot_name=snapshot_name))
return False
if is_wait is False:
LOG.info(L('RESTORE SNAPSHOT SUCCEED.', snapshot_name=snapshot_name))
return True
else:
r = self.wait_restore_job(database_name)
return r
def show_restore(self, database_name=None):
"""
Args:
database_name: string, database name
Returns:
((), (), ())
"""
if database_name is None:
sql = 'SHOW RESTORE'
else:
sql = 'SHOW RESTORE FROM %s' % database_name
ret = self.__execute_and_rebuild_meta_class(sql, palo_job.RestoreJob)
return ret
def wait_restore_job(self, database_name=None):
"""wait restore job finished"""
ret = self.show_restore(database_name)
if ret == ():
return True
flag = False
while flag is False:
flag = True
for restore_job in ret:
s = restore_job[palo_job.RestoreJob.State]
if s != 'FINISHED' and s != 'CANCELLED':
flag = False
time.sleep(3)
ret = self.show_restore(database_name)
LOG.info(L('RESTORE JOB FINISHED.', state=ret[-1][palo_job.RestoreJob.State],
status=ret[-1][palo_job.RestoreJob.Status]))
return 'FINISHED' == ret[-1][palo_job.RestoreJob.State]
def cancel_restore(self, database_name=None):
"""
Args:
database_name: string, database name
Returns:
True if cancel restore job succeed
False if cancel resotre job fail
"""
database_name = self.database_name if database_name is None else database_name
sql = 'CANCEL RESTORE FROM %s' % database_name
ret = self.execute(sql)
if ret == ():
LOG.info(L('CANCEL RESTORE JOB SUCCEED.', database_name=database_name))
return True
else:
LOG.info(L('CANCEL RESTORE JOB FAIL.', database_name=database_name))
return False
def show_snapshot(self, repo_name, snapshot=None, timestamp=None):
"""
Args:
repo_name: string, repo name
snapshot: string, snapshot name(label)
timestamp: string, backup timestamp
Returns:
((), ())
"""
sql = 'SHOW SNAPSHOT ON %s' % repo_name
if snapshot is None and timestamp is None:
r = self.execute(sql)
return r
sql = '%s WHERE' % sql
if snapshot:
sql = '%s SNAPSHOT = "%s"' % (sql, snapshot)
if timestamp:
sql = '%s AND TIMESTAMP = "%s"' % (sql, timestamp)
elif timestamp:
sql = '%s TIMESTAMP = "%s"' % (sql, timestamp)
r = self.execute(sql)
return r
def __get_backup_timestamp(self, repo_name, snapshot_name):
"""
Args:
snapshot_name: string, snapshot name (label)
Returns:
timestamp: string
"""
r = self.show_snapshot(repo_name, snapshot=snapshot_name)
if len(r) == 1:
return r[0][palo_job.SnapshotInfo.Timestamp]
else:
return None
def routine_load(self, table_name, routine_load_job_name, routine_load_property,
database_name=None, data_source='KAFKA'):
"""
Args:
table_name: string, table name
routine_load_job_name: string, routine load job name
routine_load_property:
database_name: RoutineLoadProperty class, routine load properties: load property, job property, data source property
data_source: string, data source like KAFKA
Returns:
if create routine load ok
"""
create_sql = 'CREATE ROUTINE LOAD {routine_load_job_name} ON {table_name} ' \
'{load_property} {job_property} FROM {data_source} {data_source_property}'
if database_name is not None:
routine_load_job_name = '%s.%s' % (database_name, routine_load_job_name)
if not isinstance(routine_load_property, RoutineLoadProperty):
LOG.info(L('CREATE ROUTINE LOAD ERROR. routine load property should be class RoutineLoadProperty'))
sql = create_sql.format(routine_load_job_name=routine_load_job_name, table_name=table_name,
load_property=routine_load_property.load_property,
job_property=routine_load_property.job_property, data_source=data_source,
data_source_property=routine_load_property.data_source_property)
try:
ret = self.execute(sql)
LOG.info(L('CREATE ROUTINE LOAD JOB OK.'))
return ret == ()
except Exception as e:
LOG.info(L('CREATE ROUTINE LOAD JOB ERROR.', msg=str(e)))
return False
def pause_routine_load(self, routine_load_job_name, database_name=None):
"""pause routine load job"""
if database_name is None:
sql = 'PAUSE ROUTINE LOAD FOR %s' % routine_load_job_name
else:
sql = 'PAUSE ROUTINE LOAD FOR %s.%s' % (database_name, routine_load_job_name)
try:
ret = self.execute(sql)
LOG.info(L('PAUSE ROUTINE LOAD OK', name=routine_load_job_name))
return ret == ()
except Exception as e:
LOG.info(L('PAUSE ROUTINE LOAD ERROR', name=routine_load_job_name, msg=str(e)))
return False
def resume_routine_load(self, routine_load_job_name, database_name=None):
"""resume routine load"""
if database_name is None:
sql = 'RESUME ROUTINE LOAD FOR %s' % routine_load_job_name
else:
sql = 'RESUME ROUTINE LOAD FOR %s.%s' % (database_name, routine_load_job_name)
try:
ret = self.execute(sql)
LOG.info(L('RESUME ROUTINE LOAD OK', name=routine_load_job_name))
return ret == ()
except Exception as e:
LOG.info(L('RESUME ROUTINE LOAD ERROR', name=routine_load_job_name, msg=str(e)))
return False
def stop_routine_load(self, routine_load_job_name, database_name=None):
"""stop routine load"""
if database_name is None:
job_name = routine_load_job_name
else:
job_name = '%s.%s' % (database_name, routine_load_job_name)
sql = 'STOP ROUTINE LOAD FOR %s' % job_name
try:
ret = self.execute(sql)
LOG.info(L('STOP ROUTINE LOAD OK', name=routine_load_job_name))
show = self.execute('SHOW ALL ROUTINE LOAD FOR %s' % job_name)
LOG.info(L('SHOW STOPPED ROUTINE LOAD', ret=show))
return ret == ()
except Exception as e:
LOG.info(L('STOP ROUTINE LOAD ERROR', name=routine_load_job_name, msg=str(e)))
show = self.execute('SHOW ALL ROUTINE LOAD FOR %s' % job_name)
LOG.info(L('SHOW STOPPED ROUTINE LOAD', ret=show))
return False
def show_routine_load(self, routine_load_job_name=None, database_name=None, is_all=False):
"""show routine load"""
if is_all is False:
all_word = ''
else:
all_word = 'ALL'
sql = 'SHOW ROUTINE LOAD'
if routine_load_job_name is None:
routine_load_job_name = ''
sql = 'SHOW {all} ROUTINE LOAD'.format(all=all_word)
else:
if database_name is not None:
routine_load_job_name = '%s.%s' % (database_name, routine_load_job_name)
else:
routine_load_job_name = routine_load_job_name
sql = 'SHOW {all} ROUTINE LOAD FOR {routine_load_job}'.format(all=all_word,
routine_load_job=routine_load_job_name)
try:
ret = self.execute(sql)
return ret
except Exception as e:
LOG.info(L('SHOW ROUTINE LOAD ERROR', msg=str(e)))
return None
def show_routine_load_task(self, routine_load_job_name):
"""show routine load task"""
sql = 'SHOW ROUTINE LOAD TASK WHERE JOBNAME="%s"' % routine_load_job_name
ret = self.execute(sql)
return ret
def get_routine_load_state(self, routine_load_job_name, database_name=None):
"""get routine load state"""
ret = self.show_routine_load(routine_load_job_name, database_name=database_name)
if ret == () or ret is None:
ret = self.show_routine_load(routine_load_job_name, database_name=database_name,
is_all=True)
if ret == () or ret is None:
return None
LOG.info(L('GET ROUTINE LOAD STATE', state=palo_job.RoutineLoadJob(ret[0]).get_state()))
return palo_job.RoutineLoadJob(ret[0]).get_state()
def wait_routine_load_state(self, routine_load_job_name, state='RUNNING', timeout=600, database_name=None):
"""
Args:
routine_load_job_name: string, routine load job name
state: string, 'NEED_SCHEDUL', 'PAUSE', 'RUNNING', 'STOPPED'
timeout: int, timeout time
Returns:
"""
while timeout > 0:
job_state = self.get_routine_load_state(routine_load_job_name, database_name=database_name)
if job_state == state:
return True
else:
time.sleep(1)
timeout -= 1
def show_tablet(self, table_name=None, database_name=None, tablet_id=None, partition_list=None):
"""
SHOW TABLETS
[FROM [db_name.]table_name | tablet_id] [partiton(partition_name_1, partition_name_1)]
[where [version=1] [and backendid=10000] [and state="NORMAL|ROLLUP|CLONE|DECOMMISSION"]]
[order by order_column]
[limit [offset,]size]
"""
if table_name is not None and tablet_id is not None:
return None
if table_name:
if database_name is not None:
table_name = '%s.%s' % (database_name, table_name)
sql = 'SHOW TABLETS FROM %s' % table_name
if partition_list:
sql = '%s PARTITION (%s)' % (sql, ','.join(partition_list))
else:
sql = 'SHOW TABLET %s' % tablet_id
ret = self.execute(sql)
return ret
def explain_query(self, sql):
"""explain sql"""
sql = 'EXPLAIN %s' % sql
ret = self.execute(sql)
return ret
def show_txn(self, txn_id, database_name=None):
"""help show TRANSACTION for more msg"""
if database_name is None:
sql = "SHOW TRANSACTION WHERE id = %s" % txn_id
else:
sql = "SHOW TRANSACTION FROM %s WHERE id = %s" % (database_name, txn_id)
ret = self.execute(sql)
return ret
def wait_txn(self, txn_id, database_name=None, status='VISIBLE', timeout=600):
"""wait txn util the status"""
while timeout > 0:
txn = self.show_txn(txn_id, database_name)
txn_status = palo_job.TransactionInfo(txn[0]).get_transaction_status()
if txn_status == status:
LOG.info(L('GET TXN STATUS', txn_id=txn_id, status=txn_status))
return True
elif txn_status == 'VISIBLE' or txn_status == 'ABORT':
LOG.info(L('GET TXN STATUS', txn_id=txn_id, status=txn_status))
return False
else:
time.sleep(1)
timeout -= 1
LOG.info(L('GET TXN STATUS TIMEOUT', txn_id=txn_id))
return False
def create_bitmap_index_table(self, table_name, bitmap_index_name, index_column_name,
storage_type=None, database_name=None, create_format=1,
is_wait=False, cluster_name=None):
"""
Create a bitmap index
"""
database_name = self.database_name if database_name is None else database_name
if create_format == 1:
sql = 'ALTER TABLE %s.%s ADD INDEX %s (%s) USING BITMAP' % (database_name, table_name, \
bitmap_index_name, index_column_name)
elif create_format == 2:
sql = 'CREATE INDEX %s ON %s.%s (%s) USING BITMAP' % (bitmap_index_name, database_name, \
table_name, index_column_name)
ret = self.execute(sql)
if ret != ():
LOG.info(L("CREATE BITMAP INDEX fail.", database_name=database_name, \
table_name=table_name, \
bitmap_index_name=bitmap_index_name))
return False
ret = True
if is_wait:
ret = self.wait_table_schema_change_job(table_name, cluster_name=cluster_name,
database_name=database_name)
LOG.info(L("CREATE BITMAP INDEX succ.", database_name=database_name, \
table_name=table_name, \
bitmap_index_name=bitmap_index_name))
return ret
def drop_bitmap_index_table(self, table_name, bitmap_index_name,
storage_type=None, database_name=None, create_format=1,
is_wait=False, cluster_name=None):
"""
Drop a bitmap index
"""
database_name = self.database_name if database_name is None else database_name
if create_format == 1:
sql = 'ALTER TABLE %s.%s DROP INDEX %s' % (database_name, table_name, bitmap_index_name)
elif create_format == 2:
sql = 'DROP INDEX %s ON %s.%s' % (bitmap_index_name, database_name, table_name)
ret = self.execute(sql)
if ret != ():
LOG.info(L("DROP BITMAP INDEX fail.", database_name=database_name, \
table_name=table_name, \
bitmap_index_name=bitmap_index_name))
return False
ret = True
if is_wait:
ret = self.wait_table_schema_change_job(table_name, cluster_name=cluster_name,
database_name=database_name)
LOG.info(L("DROP BITMAP INDEX succ.", database_name=database_name, \
table_name=table_name, \
bitmap_index_name=bitmap_index_name))
return ret
def get_bitmap_index_list(self, table_name, database_name=None):
"""
Get index list from table
"""
database_name = self.database_name if database_name is None else database_name
sql = "SHOW INDEX FROM %s.%s" % (database_name, table_name)
ret = self.execute(sql)
return ret
def is_exists_index_in_table(self, index_name, index_name_col, table_name, database_name=None):
"""
return True if index exists in table, else return False
"""
database_name = self.database_name if database_name is None else database_name
index_list = self.get_bitmap_index_list(table_name, database_name)
if not index_list:
return False
for each_index_info in index_list:
job_get_index = palo_job.TableIndexInfo(each_index_info)
job_index_name = job_get_index.get_key_name()
job_index_column = job_get_index.get_column_name()
job_index_type = job_get_index.get_index_type()
if index_name == job_index_name and index_name_col == job_index_column and \
job_index_type == "BITMAP":
return True
return False
def select_into(self, query, output_file, broker, property=None, format_as=None):
"""
broker is BrokerInfo class
property: dict, for csv, eg.{"column_separator": ",", "line_delimiter": "\n", "max_file_size": "100MB"}
query_stmt
INTO OUTFILE "file:///path/to/file_prefix"
FORMAT AS CSV|PARQUET
PROPERTIES
(broker_propterties & other_properties);
eg:
SELECT * FROM tbl
INTO OUTFILE "hdfs:/path/to/result_"
FORMAT AS CSV
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "doris@YOUR.COM",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "100MB"
);
"""
sql = '{query} INTO OUTFILE "{outfile}" {format_as} PROPERTIES ({properties}) '
if format_as is None:
format_as = ''
else:
format_as = 'FORMAT AS %s' % format_as
broker_property = broker.to_select_into_broker_property_str()
if property is not None:
into_properties = ''
for k, v in property.items():
into_properties += ' "%s" = "%s",' % (k, v)
p = into_properties.strip(',')
property = broker_property + ',' + p
else:
property = broker_property
sql = sql.format(query=query, outfile=output_file, format_as=format_as,
properties=property)
LOG.info(L('SELECT INTO.', sql=sql))
rows, ret = self.execute(sql, True)
LOG.info(L('SELECT INTO ret.', ret=ret))
return ret
def set_variables(self, k, v, is_global=False):
"""
set variables
如果为global需要重新connect,show的时候才会看到新的设置
"""
if is_global:
sql = 'SET GLOBAL %s=%s' % (k, v)
else:
sql = 'SET %s=%s' % (k, v)
ret = self.execute(sql)
return ret == ()
def show_variables(self, prefix=None):
"""
show variables
"""
if prefix:
sql = 'SHOW VARIABLES LIKE "%%%s%%"' % prefix
else:
sql = 'SHOW VARIABLES'
ret = self.execute(sql)
return ret
def wait_routine_load_commit(self, routine_load_job_name, committed_expect_num, timeout=600):
"""wait task committed"""
print('expect commited rows: %s\n' % committed_expect_num)
while timeout > 0:
ret = self.show_routine_load(routine_load_job_name)
routine_load_job = palo_job.RoutineLoadJob(ret[0])
loaded_rows = routine_load_job.get_loaded_rows()
print(loaded_rows)
if str(loaded_rows) == str(committed_expect_num):
time.sleep(3)
return True
timeout -= 3
time.sleep(3)
return False
def enable_feature_batch_delete(self, table_name, database_name=None, is_wait=True):
"""enable feature batch delete"""
if database_name is None:
sql = 'ALTER TABLE %s ENABLE FEATURE "BATCH_DELETE"' % table_name
else:
sql = 'ALTER TABLE %s.%s ENABLE FEATURE "BATCH_DELETE"' % (database_name, table_name)
ret = self.execute(sql)
if is_wait:
ret = self.wait_table_schema_change_job(table_name, database_name)
return ret
return ret == ()
def truncate(self, table_name, partition_list=None, database_name=None):
"""truncate table / partition"""
if database_name is None:
sql = 'TRUNCATE TABLE %s' % table_name
else:
sql = 'TRUNCATE TABLE %s.%s' % (database_name, table_name)
if partition_list is not None:
sql = '%s PARTITION (%s)' % (sql, ','.join(partition_list))
ret = self.execute(sql)
return ret == ()
def commit(self):
"""commit"""
ret = self.execute('COMMIT')
return ret == ()
def begin(self):
"""begin"""
ret = self.execute('BEGIN')
return ret == ()
def rollback(self):
"""rollback"""
ret = self.execute('ROLLBACK')
return ret == ()
def update(self, table_name, set_list, where_clause=None, database_name=None):
"""
ref: UPDATE table_reference SET assignment_list [WHERE where_condition]
table_name: str
set_list: ['k1=2', 'k3=k3+1']
where_clause: str or ['k1 > 0'], 当是list的时候,使用and进行连接
"""
if database_name is not None:
table_name = '%s.%s' % (database_name, table_name)
if isinstance(set_list, list):
set_ref = ','.join(set_list)
else:
set_ref = set_list
if where_clause is not None:
if isinstance(where_clause, str):
where_ref = 'WHERE %s' % where_clause
elif isinstance(where_clause, list):
where_ref = 'WHERE %s' % ' AND '.join(where_clause)
# else: pass
else:
where_ref = ''
sql = 'UPDATE {tb} SET {set_ref} {where_ref}'.format(tb=table_name,
set_ref=set_ref,
where_ref=where_ref)
ret = self.execute(sql)
return ret == ()
def admin_show_config(self, key=None):
"""ADMIN SHOW FRONTEND CONFIG [LIKE "pattern"]"""
if key is None:
sql = 'ADMIN SHOW FRONTEND CONFIG'
else:
sql = 'ADMIN SHOW FRONTEND CONFIG LIKE "%s"' % key
return self.execute(sql)
def show_dynamic_partition_tables(self, database_name=None):
"""
get dynamic partition table families
"""
if database_name is None:
sql = "SHOW DYNAMIC PARTITION TABLES"
result = self.execute(sql)
return result
sql = "SHOW DYNAMIC PARTITION TABLES FROM %s" % database_name
result = self.execute(sql)
return result
def get_comment(self, database_name, table_name):
"""
get table comment
"""
sql = "select TABLE_COMMENT from information_schema.TABLES where TABLE_SCHEMA='%s' and TABLE_NAME='%s'" \
% (database_name, table_name)
ret = self.execute(sql)
return ret[0][0]
def get_column_comment(self, table_name, column_name):
"""
get column comment
"""
sql = "show full columns from %s" % table_name
columns = self.execute(sql)
for column in columns:
if column[0] == column_name:
return column[8]
def create_sync_job(self, table_name, database_name, mysql_table_name, mysql_database_name, job_name, canal_ip,
columns=None, partitions=None, canal_port='11111', destination='example', batch_size='8192',
username='', password='', is_wait=True):
"""
create sync job
table_name: str or list,导入到doris的表 eg: 'table_1', ['table_1', 'table_2']
database_name: str,binlog任务所在的doris数据库
mysql_table_name: str or list,导入的MySQL表,需与doris表一一对应 eg:['mysql_table_1', 'mysql_table_2']
mysql_database_name: list,导入的MySQL表的数据库
columns: list or None,指定列映射,eg: ['k1', 'k2'], [['k1', 'k2'], ['k2', 'v1']]
partitions: list or None,指定导入分区,eg: ['p1', 'p2'], [[], ['p1', 'p3']]
"""
if not isinstance(table_name, list):
table_name = [table_name, ]
if not isinstance(mysql_table_name, list):
mysql_table_name = [mysql_table_name, ]
if not isinstance(mysql_database_name, list):
mysql_database_name = [mysql_database_name, ]
if columns is not None and not isinstance(columns[0], list):
columns = [columns]
if partitions is not None and not isinstance(partitions[0], list):
partitions = [partitions]
if len(table_name) != len(mysql_table_name):
LOG.info(L('doris tables are not corresponding to mysql tables'))
return False
channel_desc = ""
for i in range(len(table_name)):
column_desc = ''
if columns is not None and columns[i] is not []:
for column in columns[i]:
column_desc = '%s%s,' % (column_desc, column)
column_desc = '(%s)' % column_desc[:-1]
partition_desc = ''
if partitions is not None and partitions[i] is not []:
if len(partitions[i]) > 1:
for partition in partitions[i]:
partition_desc = '%s%s,' % (partition_desc, partition)
partition_desc = 'PARTITIONS (%s)' % partition_desc[:-1]
else:
partition_desc = 'PARTITION (%s)' % partitions[i][0]
else:
partition_desc = ''
channel_desc = "%s FROM %s.%s INTO %s %s %s," % (channel_desc, mysql_database_name[i], \
mysql_table_name[i], table_name[i], partition_desc,
column_desc)
sql = "CREATE SYNC %s.%s (%s)" \
"FROM BINLOG (" \
"'type' = 'canal'," \
"'canal.server.ip' = '%s'," \
"'canal.server.port' = '%s'," \
"'canal.destination' = '%s'," \
"'canal.batchSize' = '%s'," \
"'canal.username' = '%s'," \
"'canal.password' = '%s')" % (database_name, job_name, channel_desc[:-1], canal_ip, canal_port, \
destination, batch_size, username, password)
ret = self.execute(sql)
if ret != ():
LOG.info(L('CREATE SYNC JOB fail.', job_name=job_name, database_name=database_name))
return False
if is_wait:
ret = self.wait_binlog_state(job_name)
if not ret:
LOG.info(L('CREATE SYNC JOB fail.', database_name=database_name, job_name=job_name))
return False
LOG.info(L('CREATE SYNC JOB succ.', database_name=database_name, job_name=job_name))
return True
def wait_binlog_state(self, job_name, state='RUNNING'):
"""
wait modify binlog job state
"""
job_state = self.get_sync_job_state(job_name)
if job_state == state:
LOG.info(L('SYNC JOB %s' % state))
return True
timeout = 10
while timeout > 0:
time.sleep(2)
job_state = self.get_sync_job_state(job_name)
if job_state == state:
LOG.info(L('SYNC JOB %s' % state))
return True
else:
timeout -= 1
LOG.info(L('SYNC JOB STATE ERROR', EXPECTED=state, ACTUAL=job_state))
return False
def pause_sync_job(self, job_name, database_name=None, is_wait=True):
"""
pause sync job
"""
if database_name is None:
sql = 'PAUSE SYNC JOB %s' % job_name
else:
sql = 'PAUSE SYNC JOB %s.%s' % (database_name, job_name)
ret = self.execute(sql)
if ret != ():
LOG.info(L('PAUSE SYNC JOB fail.', job_name=job_name, database_name=database_name))
return False
if is_wait:
ret = self.wait_binlog_state(job_name, 'PAUSED')
if not ret:
LOG.info(L('PAUSE SYNC JOB fail.', database_name=database_name, job_name=job_name))
return False
LOG.info(L('PAUSE SYNC JOB succ.', database_name=database_name, job_name=job_name))
return True
def resume_sync_job(self, job_name, database_name=None, is_wait=True):
"""
resume sync job
"""
if database_name is None:
sql = 'RESUME SYNC JOB %s' % job_name
else:
sql = 'RESUME SYNC JOB %s.%s' % (database_name, job_name)
ret = self.execute(sql)
if ret != ():
LOG.info(L('RESUME SYNC JOB fail.', job_name=job_name, database_name=database_name))
return False
if is_wait:
ret = self.wait_binlog_state(job_name)
if not ret:
LOG.info(L('RESUME SYNC JOB fail.', database_name=database_name, job_name=job_name))
return False
LOG.info(L('RESUME SYNC JOB succ.', database_name=database_name, job_name=job_name))
return True
def stop_sync_job(self, job_name, database_name=None, is_wait=True):
"""
stop sync job
"""
if database_name is None:
sql = 'STOP SYNC JOB %s' % job_name
else:
sql = 'STOP SYNC JOB %s.%s' % (database_name, job_name)
ret = self.execute(sql)
if ret != ():
LOG.info(L('STOP SYNC JOB fail.', job_name=job_name, database_name=database_name))
return False
if is_wait:
ret = self.wait_binlog_state(job_name, 'CANCELLED')
if not ret:
LOG.info(L('STOP SYNC JOB fail.', database_name=database_name, job_name=job_name))
return False
LOG.info(L('STOP SYNC JOB succ.', database_name=database_name, job_name=job_name))
return True
def get_sync_job_state(self, job_name):
"""
get sync job state
"""
sync_job_list = self.show_sync_job()
condition_col_idx = palo_job.SyncJobInfo.JobName
retrun_clo_idx = palo_job.SyncJobInfo.State
return util.get_attr_condition_value(sync_job_list, condition_col_idx, job_name, retrun_clo_idx)
def show_sync_job(self, database_name=None):
"""
get sync job information
"""
if database_name is None:
sql = "SHOW SYNC JOB"
result = self.execute(sql)
return result
sql = "SHOW SYNC JOB FROM %s" % database_name
result = self.execute(sql)
return result
def set_frontend_config(self, config, value):
"""
admin set frontend config
"""
sql = 'ADMIN SET FRONTEND CONFIG ("%s" = "%s")' % (config, value)
return self.execute(sql)
def get_partition_replica_allocation(self, table_name, partition_name, database_name=None):
"""
get table family replica allocation
"""
partition = self.get_partition(table_name, partition_name, database_name)
if partition:
return partition[palo_job.PartitionInfo.ReplicaAllocation]
else:
return None
def modify_resource_tag(self, host_name, port, tag_location):
"""
tag_location: str eg. grout_a, dict example: {'tag.location': 'a', 'tag.compute': 'b', ...}
修改be标签
"""
if isinstance(tag_location, str):
sql = "ALTER SYSTEM MODIFY BACKEND '%s:%s' SET ('tag.location'='%s')" % (host_name, port, tag_location)
elif isinstance(tag_location, dict):
sql = "ALTER SYSTEM MODIFY BACKEND '%s:%s' SET %s" % (host_name, port,
util.convert_dict2property(tag_location))
else:
return None
result = self.execute(sql)
time.sleep(2)
if result != ():
LOG.info(L('MODIFY BACKEND FAIL', backend=host_name, port=port, tag_location=tag_location))
return False
return True
def get_replica_backend_id(self, table_name):
"""get BackendId from replica status"""
sql = "ADMIN SHOW REPLICA STATUS FROM %s" % table_name
replica_status = self.execute(sql)
column_idx = palo_job.ReplicaStatus.BackendId
return util.get_attr(replica_status, column_idx)
def admin_check_tablet(self, tablet_id_list):
"""admin check tablet"""
sql = "ADMIN CHECK TABLE (%s) PROPERTIES('type'='consistency')" % ','.join(tablet_id_list)
self.execute(sql)
def admin_repair_table(self, table_name, partition_list=None):
"""admin repair table"""
sql = "ADMIN REPAIR TABLE %s" % table_name
if partition_list is not None:
sql = '%s PARTITION (%s)' % (sql, ','.join(partition_list))
self.execute(sql)
def admin_diagnose_tablet(self, tablet_id):
"""admin diagnose tablet"""
sql = "ADMIN DIAGNOSE TABLET %s" % tablet_id
ret = self.execute(sql)
return ret
def get_resource_tag(self, host_ip):
"""通过be的IP获取be的标签信息"""
backend_list = self.get_backend_list()
for backend in backend_list:
be = palo_job.BackendProcInfo(backend)
if be.get_ip() == host_ip:
return be.get_tag()
return None
def get_resource_tag_by_id(self, be_id):
"""获取backend tag by id"""
be = self.get_backend(be_id)
if not be:
return None
return palo_job.BackendProcInfo(be).get_tag()