2010-07-26 28 views
1

J'écris une sous-classe threading.Thread qui implémente les méthodes 'callable'.Comment puis-je améliorer ce code

Normalement avec une méthode arbitraire sur un thread, la méthode s'exécute dans n'importe quel thread l'appelle. Ces méthodes s'exécutent dans le thread sur lequel elles sont appelées et bloquent l'exécution ultérieure dans le thread appelant ou renvoient un moyen d'obtenir le résultat en fonction du décorateur utilisé pour créer la méthode. Le code est:

import threading 
import collections 


class CallableThread(threading.Thread): 

    def __init__(self, *args, **kwargs): 
     #_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method 
     self._enqueued_calls = collections.deque() 
     # _enqueue_call_permission is for callers to signal that they have placed something on the queue 
     self._enqueue_call_permission = threading.Condition()     
     super(CallableThread, self).__init__(*args, **kwargs) 

    @staticmethod 
    def blocking_method(f): 
     u"""A decorator function to implement a blocking method on a thread""" 
     # the returned function enqueues the decorated function and blocks until the decorated function 
     # is called and returns. It then returns the value unmodified. The code in register runs 
     # in the calling thread and the decorated method runs in thread that it is called on 
     def register(self, *args, **kwargs): 
      call_complete = threading.Condition() 
      response = collections.deque() 
      with self._enqueue_call_permission: 
       self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete)) 
       self._enqueue_call_permission.notify() 
      with call_complete: 
       if not response: 
        call_complete.wait() 
      return response.popleft() 
     return register 

    @staticmethod 
    def nonblocking_method(f): 
     u"""A decorator function to implement a non-blocking method on a thread""" 
     # the returned function enqueues the decorated function and returns a tuple consisting of a condition 
     # to wait for and a deque to read the result out of. The code in register runs in the calling thread 
     # and the decorated method runs in thread that it is called on 
     def register(self, *args, **kwargs): 
      call_complete = threading.Condition() 
      response = collections.deque() 
      with self._enqueue_call_permission: 
       self._enqueued_calls.append(((f, self, args, kwargs), None, None)) 
       self._enqueue_call_permission.notify() 
      return call_complete, response 
     return register  

    def run(self):   
     self._run = True 
     while self._run: # while we've not been killed 
      with self._enqueue_call_permission: # get the condition so we can wait on it. 
       if not self._enqueued_calls: 
        self._enqueue_call_permission.wait() # wait if we need to 
      while self._enqueued_calls: 
       ((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft() 
       with call_complete:  
        response_deque.append(f(self, *args, **kwargs)) 
        call_complete.notify() 


    def stop(self): 
     u""" Signal the thread to stop""" 
     self._run = False      



if __name__=='__main__': 
    class TestThread(CallableThread): 
     u"""Increment a counter on each call and print the value""" 
     counter = 0 
     @CallableThread.blocking_method 
     def increment(self, tag): 
      print "{0} FROM: {1}".format(self.counter, tag) 
      self.counter += 1 

    class ThreadClient(threading.Thread): 
     def __init__(self, callable_thread, tag): 
      self.callable_thread = callable_thread 
      self.tag = tag 
      super(ThreadClient, self).__init__() 

     def run(self): 
      for i in range(0, 4): 
       self.callable_thread.increment(self.tag) 

    t = TestThread() 
    t.start() 
    clients = [ThreadClient(t, i) for i in range(0, 10)] 
    for client in clients: 
     client.start() 
##  client.join() 
    for client in clients: 
     client.join() 
    t.stop() 

Comme vous pouvez sans doute le voir, j'utilise des méthodes statiques comme décorateurs. Les décorateurs prennent la méthode à laquelle ils sont appliqués et renvoient une fonction qui met en file d'attente la fonction décorée avec les arguments avec lesquels elle est appelée, une instance threading.Condition à notify et une instance collections.deque pour corriger le résultat.

Des suggestions? Je suis particulièrement intéressé par le nommage, les points architecturaux et la robustesse

EDIT: certaines modifications que j'ai faites sur la base de suggestions alors que j'étais loin d'un interpréteur ont cassé le code, donc je l'ai juste corrigé.

+1

Je pense que vous avez une faute de frappe impliquant _enqueue_call_permission (dans def run) – cpf

Répondre

2

Vous pouvez utiliser

def register(self, *args, **kwargs) 

au lieu de déballer l'auto de args. Aussi, pourquoi utiliser les majuscules pour _RUN?

+0

Bon point sur ne pas déballer soi-même. Longue histoire sur _RUN étant en majuscules. Il a commencé comme un global dans un autre module qui s'appliquait à tous les threads et je capitalisais toujours de telles variables. Je vais aller de l'avant et le changer parce que cela semble anormal maintenant que vous le mentionnez – aaronasterling