leda_python/refactoring/thread.py (59 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding:-utf-8 -*- import queue import threading import logging _logger = logging.getLogger('dbus_threadPool') class Thread(threading.Thread): ''' 定义一个能够处理任务的线程类,属于自定义线程类,自定义线程类就需要定义run()函数 ''' def __init__(self, taskQueue): threading.Thread.__init__(self) self.taskQueue = taskQueue self.stopFlag = False self.setDaemon(True) self.start() def enableStopFlag(self): self.stopFlag = True def run(self): ''' 重构run 方法 ''' while (True): if (True == self.stopFlag): break try: _logger.debug("") _logger.debug("getting ruleTask from dbusThreadPool queue...") func, kwargs = self.taskQueue.get(block=True, timeout=10) func(**kwargs) except queue.Empty: _logger.debug("dbusThreadPool Queue is empty and have no ruleTask in the queue") except: _logger.exception("dbusThreadPool: task exec err#####") class ThreadPool(object): ''' 自定义线程池 ''' def __init__(self, threadNum, taskNum): self.threadList = [] self.taskQueue = queue.Queue(maxsize=taskNum) self._init_threadPool(threadNum) def _init_threadPool(self, threadNum): for i in range(threadNum): thread = Thread(self.taskQueue) self.threadList.append(thread) def addTask(self, func, **kwargs): try: self.taskQueue.put((func, kwargs), block=False) except queue.Full: _logger.warning("dbusThreaPool Queue is overflowed") pass def releasThread(self): """ 关闭线程池中所有的线程 """ for item in self.threadList: if (item.isAlive()): item.enableStopFlag() item.join() # 函数名称: createThread # 功能描述: 创建独立线程 # 输入参数: # task: 任务 # args: 任务的参数 # 返 回 值: 无 def createThread(task, args=()): t = threading.Thread(target=task, args=args) t.setDaemon(True) t.start()