2010-07-30 23 views
3

Je travaille sur une sous-classe de threading.Thread qui permet d'appeler et d'exécuter ses méthodes dans le thread représenté par l'objet sur lequel elles sont appelées par opposition au comportement habituel . Je fais cela en utilisant des décorateurs sur la méthode cible qui place l'appel à la méthode dans un collections.deque et en utilisant la méthode run pour traiter le deque.La fonction python ne retourne pas sauf si la dernière instruction est lente

la méthode run utilise une instruction while not self.__stop: et un objet threading.Condition d'attendre un appel à être placé dans le deque puis appelez self.__process_calls. La partie else de la boucle while effectue un dernier appel à __process_calls. si self.__stop, une exception est levée sur toute tentative d'appel de l'une des méthodes 'callable' à partir d'un autre thread.

Le problème est que __process_calls ne peut pas retourner à moins que la dernière instruction est un print que j'ai découvert pendant le débogage. J'ai essayé a = 1 et un return explicite mais aucun travail. avec n'importe quelle instruction print comme l'instruction finale de la fonction cependant, il renvoie et le thread ne se bloque pas. Avez-vous une idée de ce qui passe?

EDIT: Il a été souligné par David Zaslavsky que l'impression fonctionne parce qu'il prend un certain temps et je l'ai confirmé que

Le code est un peu long, mais je l'espère, mon explication est au-dessus assez clair pour comprendre il.

import threading 
import collections  

class BrokenPromise(Exception): pass  
class CallableThreadError(Exception): pass  
class CallToNonRunningThreadError(CallableThreadError): pass 


class Promise(object): 
    def __init__(self, deque, condition): 
     self._condition = condition 
     self._deque = deque 

    def read(self, timeout=None): 
     if not self._deque: 
      with self._condition: 
       if timeout: 
        self._condition.wait(timeout) 
       else: 
        self._condition.wait() 
     if self._deque: 
      value = self._deque.popleft() 
      del self._deque 
      del self._condition 
      return value 
     else: 
      raise BrokenPromise 

    def ready(self): 
     return bool(self._deque) 

class CallableThread(threading.Thread): 
    def __init__(self, *args, **kwargs): 
     # _enqueued_calls is used to store tuples that encode a function call. 
     # It is processed by the run method 
     self.__enqueued_calls = collections.deque() 
     # _enqueue_call_permission is for callers to signal that they have 
     # placed something on the queue 
     self.__enqueue_call_permission = threading.Condition() 
     self.__stop = False 
     super(CallableThread, self).__init__(*args, **kwargs) 

    @staticmethod 
    def blocking_method(f): 
     u"""A decorator function to implement a blocking method on a thread""" 
     # the returned function enqueues the decorated function and blocks 
     # until the decorated function# is called and returns. It then returns 
     # the value unmodified. The code in register runs in the calling thread 
     # and the decorated method runs in thread that it is called on 
     f = CallableThread.nonblocking_method_with_promise(f) 
     def register(self, *args, **kwargs): 
      p = f(self, *args, **kwargs) 
      return p.read() 
     return register 

    @staticmethod 
    def nonblocking_method_with_promise(f): 
     u"""A decorator function to implement a non-blocking method on a 
     thread 
     """ 
     # the returned function enqueues the decorated function and returns a 
     # Promise object.N The code in register runs in the calling thread 
     # and the decorated method runs in thread that it is called on. 
     def register(self, *args, **kwargs): 
      call_complete = threading.Condition() 
      response_deque = collections.deque() 
      self.__push_call(f, args, kwargs, response_deque, call_complete) 
      return Promise(response_deque, call_complete) 
     return register 

    @staticmethod 
    def nonblocking_method(f): 
     def register(self, *args, **kwargs): 
      self.__push_call(f, args, kwargs) 
     return register 

    def run(self):   
     while not self.__stop: # while we've not been killed 
      with self.__enqueue_call_permission: 
       # get the condition so that we can wait on it if we need too. 
       if not self.__enqueued_calls: 
        self.__enqueue_call_permission.wait() 
      self.__process_calls() 
     else: 
      # if we exit because self._run == False, finish processing 
      # the pending calls if there are any 
      self.__process_calls() 

    def stop(self): 
     u""" Signal the thread to stop""" 
     with self.__enqueue_call_permission: 
      # we do this in case the run method is stuck waiting on an update 
      self.__stop = True 
      self.__enqueue_call_permission.notify() 

    def __process_calls(self): 
     print "processing calls" 
     while self.__enqueued_calls: 
      ((f, args, kwargs), 
      response_deque, call_complete) = self.__enqueued_calls.popleft() 
      if call_complete: 
       with call_complete: 
        response_deque.append(f(self, *args, **kwargs)) 
        call_complete.notify() 
      else: 
       f(self, *args, **kwargs) 
     # this is where you place the print statement if you want to see the 
     # behavior   

    def __push_call(self, f, args, kwargs, response_deque=None, 
        call_complete=None): 
     if self.__stop: 
      raise CallToNonRunningThreadError(
        "This thread is no longer accepting calls") 
     with self.__enqueue_call_permission: 
      self.__enqueued_calls.append(((f, args, kwargs), 
              response_deque, call_complete)) 
      self.__enqueue_call_permission.notify() 


#if __name__=='__main__':  i lost the indent on the following code in copying but 
#it doesn't matter in this context 
class TestThread(CallableThread): 
    u"""Increment a counter on each call and print the value""" 
    counter = 0 

    @CallableThread.nonblocking_method_with_promise 
    def increment(self): 
     self.counter += 1 
     return self.counter 

class LogThread(CallableThread): 

    @CallableThread.nonblocking_method 
    def log(self, message): 
     print message 

l = LogThread() 
l.start() 
l.log("logger started") 
t = TestThread() 
t.start() 
l.log("test thread started") 
p = t.increment() 
l.log("promise aquired") 
v = p.read() 
l.log("promise read") 
l.log("{0} read from promise".format(v)) 
l.stop() 
t.stop() 
l.join() 
t.join() 
+1

Sans faire une analyse complète, ma première supposition serait que 'œuvres print', car il faut un temps relativement long, par rapport à quelque chose comme' a = 1'. Vous pouvez essayer d'utiliser quelque chose comme 'time.sleep (0.01)' à la place et je pense que cela devrait avoir le même effet, mais sans produire de sortie. Bien sûr, c'est juste une solution de contournement. Je ne peux pas deviner pourquoi '__process_calls' aurait besoin de ce délai à la fin pour retourner correctement. –

+0

@David. Cela fonctionne très bien. Bien sûr, je suis toujours très curieux de savoir ce qui se passe ici. Je vais essayer de faire bouillir le code à quelque chose de plus simple @bstpierre, vous avez raison. J'ai modifié le code pour corriger cela. Je peux être un peu spacy parfois et je n'avais pas exécuté cette partie du code depuis que je l'ai refactorisé. J'ai vraiment besoin d'écrire des tests unitaires pour cette chose. – aaronasterling

+0

Lorsque vous tuez, obtenez-vous une pile d'appels? Quel est le lieu d'appel? – bstpierre

Répondre

1
  1. __process_calls modifie __enqueued_calls sans posséder la serrure. Cela peut créer une condition de concurrence. : Deque peut être "threadsafe" (c'est-à-dire non corrompu par les accès au thread), mais la vérification de son état doit encore être verrouillée.

  2. La condition d'arrêt n'est pas sûre.

Commentaires en ligne:

def run(self):   
    while not self.__stop: # while we've not been killed 
     with self.__enqueue_call_permission: 
      # get the condition so that we can wait on it if we need too. 
      ### should be checking __stop here, it could have been modified before 
      ### you took the lock. 
      if not self.__enqueued_calls: 
       self.__enqueue_call_permission.wait() 
     self.__process_calls() 
    else: 
     # if we exit because self._run == False, finish processing 
     # the pending calls if there are any 
     self.__process_calls() 
+0

Je l'ai essayé et non. le manuel dit que la méthode 'popleft' sur' deque' est threadsafe. '__enqueue_call_permission' est juste une méthode pour' __push_call' de signaler à 'run' qu'il y a un nouvel appel sur' deque'. Je devrais probablement le renommer. – aaronasterling

+0

Il était 3. Je ne sais toujours pas comment cela fonctionne. Mon runthrough mental indique que cette erreur se traduirait par 'run' suspendu à l'appel de' __enqueue_call_permission.wait() 'mais ce n'est pas du tout le comportement qui s'est manifesté. En tout cas, c'est réglé maintenant. – aaronasterling

+0

J'ai eu la même pensée. Cela m'a semblé faux, mais je ne pouvais pas trouver un scénario d'échec qui correspondait à vos symptômes, mais je suis content que ce soit corrigé. – bstpierre