core/lib/hook.py (105 lines of code) (raw):
#!/usr/bin/env python3
"""
Copyright (c) 2017-present, Facebook, Inc.
All rights reserved.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
"""
import codecs
import functools
import logging
import time
from threading import Thread
from .error import OSCError
log = logging.getLogger(__name__)
def wrap_hook(func):
"""
Decorator for wrapping hooks around payload functions.
decorate function with @wrap_hook so that two hooks point will
be automatically wrapped around the execution.
For example:
@wrap_hook
def foo(self):
pass
will register both 'before_foo' and 'after_foo', which will be invoked
before and after the foo function being executed.
"""
@functools.wraps(func)
def func_with_hook(self, *args, **kwargs):
self.execute_hook("before_{}".format(func.__name__))
result = func(self, *args, **kwargs)
self.execute_hook("after_{}".format(func.__name__))
return result
return func_with_hook
class HookBase(object):
"""
Base hook, cannot be used directly
"""
def __init__(self, critical=False, **kwargs):
self.critical = critical
def execute(self, payload):
try:
self._execute(payload)
log.debug("Hook excution finished")
except Exception as e:
if self.critical:
raise
else:
log.exception("Hook execution error: {}".format(e))
def _execute(self, payload):
raise NotImplementedError("_execute function in Hook not implemented")
class NoopHook(HookBase):
"""
None-op hook, this is the default hook if we don't specify an override for
certain hook point
"""
def _execute(self, payload):
log.debug("Noop hook, doing nothing here")
class SQLHook(HookBase):
"""
Hook for executing SQLs inside sql_file_path
"""
def __init__(self, sql_file_path="", *args, **kwargs):
super(SQLHook, self).__init__(*args, **kwargs)
self.file_path = sql_file_path
self._dbh = None
self._is_select = None
self._sqls = []
self._expected_lines = []
self.read_sqls()
def read_sqls(self):
log.debug("Reading {}".format(self.file_path))
with codecs.open(self.file_path, "r", "utf-8") as fh:
current_sql = ""
for line in fh:
# ignore sql comments
if line.startswith("--"):
continue
# ignore empty line
if not line.strip():
continue
if self._is_select is None:
if line.startswith("SELECT"):
# The first line of expected result is always SELECT
# statement
self._is_select = True
self.critical = True
self._sqls.append(line)
continue
else:
self._is_select = False
if self._is_select:
self._expected_lines.append(line.strip())
else:
current_sql += line
if line.endswith(";\n"):
self._sqls.append(current_sql)
current_sql = ""
log.debug(self._sqls)
def execute_sqls(self):
"""
Execute the given sql against MySQL without caring about the result
output
"""
# If the first line start with 'SELECT' then it's an assertion. We
# should check the result set we get from MySQL against the rows
# written as the rest of the SQL file
if self._is_select:
result = self._dbh.query_array(self._sqls[0])
if len(result) != len(self._expected_lines):
raise OSCError(
"ASSERTION_ERROR",
{
"expected": "{} lines of result set".format(
len(self._expected_lines)
),
"got": "{} lines of result set".format(len(result)),
},
)
for idx, expected_row in enumerate(self._expected_lines):
got_line = "\t".join([str(col) for col in result[idx]])
if got_line != expected_row:
raise OSCError(
"ASSERTION_ERROR", {"expected": expected_row, "got": got_line}
)
else:
for sql in self._sqls:
log.debug("Running the following SQL on MySQL: {} ".format(sql))
self._dbh.execute(sql)
def _execute(self, payload):
self._dbh = payload.conn
log.info("Running sql file: {} for {}".format(self.file_path, payload.socket))
class SQLNewConnHook(SQLHook):
"""
Hook for execute SQLs inside a file using a separate connection to
database. This is useful when you don't want to reuse the same connection
as the one used for OSC operation, so that it can has different sql_mode,
session setting, etc
"""
def _execute(self, payload):
self._dbh = payload.get_conn(payload.current_db)
log.info("Running sql file: {} for {}".format(self.file_path, payload.socket))
self.execute_sqls()
self._dbh.close()
class SQLNewConnInThreadHook(SQLHook):
"""
Hook for execute SQLs inside a file using a separate database connection
and inside a separate thread.
This is useful when you want to run a slow SQL in the trigger, and don't
want to block the main OSC logic from running
"""
def _execute(self, payload):
thd = Thread(target=self.execute_sqls)
self._dbh = payload.get_conn(payload.current_db)
log.info("Running sql file: {} for {}".format(self.file_path, payload.socket))
thd.start()
# Wait for a while to make sure the SQL has started running before we
# proceed
time.sleep(1)