aliyun/log/es_migration/migration_log.py (65 lines of code) (raw):
#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) Alibaba Cloud Computing
# All rights reserved.
import time
import logging
from aliyun.log import (
LogException, LogClient, SimpleLogHandler, IndexLineConfig, IndexConfig,
)
_log_fields = [
'asctime',
'filename',
'funcName',
'levelname',
'lineno',
'module',
'message',
'process',
]
_migration_logstore = 'internal-es-migration-log'
def setup_logging(migration, endpoint, project, access_key_id, access_key):
# setup internal logstore
_setup_migration_logstore(endpoint, project, access_key_id, access_key)
time.sleep(10)
# set logging level for libs
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
# add aliyunlog handler
root = logging.getLogger()
root.setLevel(logging.INFO)
fields = 'level,process_id,thread_id,module,func_name'.split(',')
handler = SimpleLogHandler(
end_point=endpoint,
access_key_id=access_key_id,
access_key=access_key,
project=project,
log_store=_migration_logstore,
fields=fields,
extract_json=True,
)
handler.skip_message = False
handler.built_in_root_field = 'logging'
handler.log_tags = [
('__migration__', migration),
]
root.addHandler(handler)
def _setup_migration_logstore(endpoint, project, access_key_id, access_key):
log_client = LogClient(
endpoint=endpoint,
accessKeyId=access_key_id,
accessKey=access_key,
)
try:
log_client.create_logstore(
project_name=project,
logstore_name=_migration_logstore,
)
except LogException as exc:
if exc.get_error_code() != "LogStoreAlreadyExist":
raise
try:
tokens = [
',', ' ', "'", '"', ';', '=', '(', ')', '[', ']', '{', '}', '?',
'@', '&', '<', '>', '/', ':', '\n', '\t', '\r',
]
line_config = IndexLineConfig(token_list=tokens)
config = IndexConfig(line_config=line_config)
log_client.create_index(project, _migration_logstore, config)
except LogException as exc:
if exc.get_error_code() != "IndexAlreadyExist":
raise