J'écris une sous-classe threading.Thread
qui implémente les méthodes 'callable'.Comment puis-je améliorer ce code
Normalement avec une méthode arbitraire sur un thread, la méthode s'exécute dans n'importe quel thread l'appelle. Ces méthodes s'exécutent dans le thread sur lequel elles sont appelées et bloquent l'exécution ultérieure dans le thread appelant ou renvoient un moyen d'obtenir le résultat en fonction du décorateur utilisé pour créer la méthode. Le code est:
import threading
import collections
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
#_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method
self._enqueued_calls = collections.deque()
# _enqueue_call_permission is for callers to signal that they have placed something on the queue
self._enqueue_call_permission = threading.Condition()
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator function to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks until the decorated function
# is called and returns. It then returns the value unmodified. The code in register runs
# in the calling thread and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete))
self._enqueue_call_permission.notify()
with call_complete:
if not response:
call_complete.wait()
return response.popleft()
return register
@staticmethod
def nonblocking_method(f):
u"""A decorator function to implement a non-blocking method on a thread"""
# the returned function enqueues the decorated function and returns a tuple consisting of a condition
# to wait for and a deque to read the result out of. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), None, None))
self._enqueue_call_permission.notify()
return call_complete, response
return register
def run(self):
self._run = True
while self._run: # while we've not been killed
with self._enqueue_call_permission: # get the condition so we can wait on it.
if not self._enqueued_calls:
self._enqueue_call_permission.wait() # wait if we need to
while self._enqueued_calls:
((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft()
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
def stop(self):
u""" Signal the thread to stop"""
self._run = False
if __name__=='__main__':
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.blocking_method
def increment(self, tag):
print "{0} FROM: {1}".format(self.counter, tag)
self.counter += 1
class ThreadClient(threading.Thread):
def __init__(self, callable_thread, tag):
self.callable_thread = callable_thread
self.tag = tag
super(ThreadClient, self).__init__()
def run(self):
for i in range(0, 4):
self.callable_thread.increment(self.tag)
t = TestThread()
t.start()
clients = [ThreadClient(t, i) for i in range(0, 10)]
for client in clients:
client.start()
## client.join()
for client in clients:
client.join()
t.stop()
Comme vous pouvez sans doute le voir, j'utilise des méthodes statiques comme décorateurs. Les décorateurs prennent la méthode à laquelle ils sont appliqués et renvoient une fonction qui met en file d'attente la fonction décorée avec les arguments avec lesquels elle est appelée, une instance threading.Condition à notify
et une instance collections.deque
pour corriger le résultat.
Des suggestions? Je suis particulièrement intéressé par le nommage, les points architecturaux et la robustesse
EDIT: certaines modifications que j'ai faites sur la base de suggestions alors que j'étais loin d'un interpréteur ont cassé le code, donc je l'ai juste corrigé.
Je pense que vous avez une faute de frappe impliquant _enqueue_call_permission (dans def run) – cpf