datahub/client/common/thread_pool.py (126 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
import queue
import threading
from concurrent.futures import Future
class TaskItem:
def __init__(self, future, func, args, kwargs):
self._future = future
self._func = func
self._args = args
self._kwargs = kwargs
def run(self):
if not self._future.set_running_or_notify_cancel():
return
try:
self._future.set_result(self._func(*self._args, **self._kwargs))
except BaseException as e:
self._future.set_exception(e)
@property
def future(self):
return self._future
@future.setter
def future(self, value):
self._future = value
class ThreadPool:
__slots__ = '_name', '_shut_down', '_shut_down_lock', \
'_queue_limit', '_queue', '_workers_limit', '_workers'
def __init__(self, queue_limit=0, workers_limit=8, name="ThreadPool"):
self._name = name
self._shut_down = False
self._shut_down_lock = threading.Lock()
self._queue_limit = sys.maxsize if queue_limit <= 0 else queue_limit
self._queue = queue.Queue(self._queue_limit)
self._workers_limit = workers_limit
self._workers = [
threading.Thread(target=self.__worker_run, name="thread_{}".format(_))
for _ in range(self._workers_limit)
]
list(map(lambda th: th.start(), self._workers))
def shutdown(self, wait_done=True, cancel_futures=False):
with self._shut_down_lock:
if self._shut_down:
return
self._shut_down = True
if cancel_futures:
while True:
try:
task = self._queue.get_nowait()
except queue.Empty:
break
if task is not None:
task.future.cancel()
if wait_done:
list(map(lambda th: th.join(), self._workers))
self._workers.clear()
def submit(self, func, *args, **kwargs):
return self.__submit_task(func, True, *args, **kwargs)
def submit_nowait(self, func, *args, **kwargs):
return self.__submit_task(func, False, *args, **kwargs)
def __submit_task(self, func, block, *args, **kwargs):
with self._shut_down_lock:
if self._shut_down:
return False
future = Future()
task = TaskItem(future, func, args, kwargs)
try:
self._queue.put(task, block=block)
except queue.Full:
return False
return future
def __worker_run(self):
while True:
try:
task = self._queue.get(block=True, timeout=1)
except queue.Empty:
if self._shut_down:
break
continue
if task is not None:
task.run()
class HashThreadPool:
__slots__ = '_name', '_shut_down', '_shut_down_lock', \
'_queue_limit', '_workers_limit', '_workers'
def __init__(self, queue_limit=0, workers_limit=8, name="HashThreadPool"):
self._name = name
self._shut_down = False
self._shut_down_lock = threading.Lock()
self._queue_limit = sys.maxsize if queue_limit <= workers_limit else queue_limit
self._workers_limit = workers_limit
self._workers = [
ThreadPool(max(self._queue_limit // self._workers_limit, 1), 1, "ThreadPool_{}".format(_))
for _ in range(self._workers_limit)
]
def shutdown(self, wait_done=True, cancel_futures=False):
with self._shut_down_lock:
if self._shut_down:
return
self._shut_down = True
for worker in self._workers:
worker.shutdown(wait_done, cancel_futures)
self._workers.clear()
def submit(self, key, func, *args, **kwargs):
with self._shut_down_lock:
if self._shut_down:
return False
index = key % self._workers_limit
worker = self._workers[index]
return worker.submit(func, *args, **kwargs)
def submit_nowait(self, key, func, *args, **kwargs):
with self._shut_down_lock:
if self._shut_down:
return False
index = key % self._workers_limit
worker = self._workers[index]
return worker.submit_nowait(func, *args, **kwargs)
def do_something(num):
print("id = ", num)
if __name__ == "__main__":
tp = ThreadPool()
for id in range(100):
tp.submit(do_something, id)
# time.sleep(1)
tp.shutdown()
print('++++++++++++++++')
htp = HashThreadPool()
for id in range(100):
htp.submit(id, do_something, id)
# time.sleep(1)
htp.shutdown()