Je travaille sur une sous-classe de threading.Thread
qui permet d'appeler et d'exécuter ses méthodes dans le thread représenté par l'objet sur lequel elles sont appelées par opposition au comportement habituel . Je fais cela en utilisant des décorateurs sur la méthode cible qui place l'appel à la méthode dans un collections.deque
et en utilisant la méthode run
pour traiter le deque.La fonction python ne retourne pas sauf si la dernière instruction est lente
la méthode run
utilise une instruction while not self.__stop:
et un objet threading.Condition
d'attendre un appel à être placé dans le deque puis appelez self.__process_calls
. La partie else
de la boucle while
effectue un dernier appel à __process_calls
. si self.__stop
, une exception est levée sur toute tentative d'appel de l'une des méthodes 'callable' à partir d'un autre thread.
Le problème est que __process_calls
ne peut pas retourner à moins que la dernière instruction est un print
que j'ai découvert pendant le débogage. J'ai essayé a = 1
et un return
explicite mais aucun travail. avec n'importe quelle instruction print
comme l'instruction finale de la fonction cependant, il renvoie et le thread ne se bloque pas. Avez-vous une idée de ce qui passe?
EDIT: Il a été souligné par David Zaslavsky que l'impression fonctionne parce qu'il prend un certain temps et je l'ai confirmé que
Le code est un peu long, mais je l'espère, mon explication est au-dessus assez clair pour comprendre il.
import threading
import collections
class BrokenPromise(Exception): pass
class CallableThreadError(Exception): pass
class CallToNonRunningThreadError(CallableThreadError): pass
class Promise(object):
def __init__(self, deque, condition):
self._condition = condition
self._deque = deque
def read(self, timeout=None):
if not self._deque:
with self._condition:
if timeout:
self._condition.wait(timeout)
else:
self._condition.wait()
if self._deque:
value = self._deque.popleft()
del self._deque
del self._condition
return value
else:
raise BrokenPromise
def ready(self):
return bool(self._deque)
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
# _enqueued_calls is used to store tuples that encode a function call.
# It is processed by the run method
self.__enqueued_calls = collections.deque()
# _enqueue_call_permission is for callers to signal that they have
# placed something on the queue
self.__enqueue_call_permission = threading.Condition()
self.__stop = False
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator function to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks
# until the decorated function# is called and returns. It then returns
# the value unmodified. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
f = CallableThread.nonblocking_method_with_promise(f)
def register(self, *args, **kwargs):
p = f(self, *args, **kwargs)
return p.read()
return register
@staticmethod
def nonblocking_method_with_promise(f):
u"""A decorator function to implement a non-blocking method on a
thread
"""
# the returned function enqueues the decorated function and returns a
# Promise object.N The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on.
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response_deque = collections.deque()
self.__push_call(f, args, kwargs, response_deque, call_complete)
return Promise(response_deque, call_complete)
return register
@staticmethod
def nonblocking_method(f):
def register(self, *args, **kwargs):
self.__push_call(f, args, kwargs)
return register
def run(self):
while not self.__stop: # while we've not been killed
with self.__enqueue_call_permission:
# get the condition so that we can wait on it if we need too.
if not self.__enqueued_calls:
self.__enqueue_call_permission.wait()
self.__process_calls()
else:
# if we exit because self._run == False, finish processing
# the pending calls if there are any
self.__process_calls()
def stop(self):
u""" Signal the thread to stop"""
with self.__enqueue_call_permission:
# we do this in case the run method is stuck waiting on an update
self.__stop = True
self.__enqueue_call_permission.notify()
def __process_calls(self):
print "processing calls"
while self.__enqueued_calls:
((f, args, kwargs),
response_deque, call_complete) = self.__enqueued_calls.popleft()
if call_complete:
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
else:
f(self, *args, **kwargs)
# this is where you place the print statement if you want to see the
# behavior
def __push_call(self, f, args, kwargs, response_deque=None,
call_complete=None):
if self.__stop:
raise CallToNonRunningThreadError(
"This thread is no longer accepting calls")
with self.__enqueue_call_permission:
self.__enqueued_calls.append(((f, args, kwargs),
response_deque, call_complete))
self.__enqueue_call_permission.notify()
#if __name__=='__main__': i lost the indent on the following code in copying but
#it doesn't matter in this context
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.nonblocking_method_with_promise
def increment(self):
self.counter += 1
return self.counter
class LogThread(CallableThread):
@CallableThread.nonblocking_method
def log(self, message):
print message
l = LogThread()
l.start()
l.log("logger started")
t = TestThread()
t.start()
l.log("test thread started")
p = t.increment()
l.log("promise aquired")
v = p.read()
l.log("promise read")
l.log("{0} read from promise".format(v))
l.stop()
t.stop()
l.join()
t.join()
Sans faire une analyse complète, ma première supposition serait que 'œuvres print', car il faut un temps relativement long, par rapport à quelque chose comme' a = 1'. Vous pouvez essayer d'utiliser quelque chose comme 'time.sleep (0.01)' à la place et je pense que cela devrait avoir le même effet, mais sans produire de sortie. Bien sûr, c'est juste une solution de contournement. Je ne peux pas deviner pourquoi '__process_calls' aurait besoin de ce délai à la fin pour retourner correctement. –
@David. Cela fonctionne très bien. Bien sûr, je suis toujours très curieux de savoir ce qui se passe ici. Je vais essayer de faire bouillir le code à quelque chose de plus simple @bstpierre, vous avez raison. J'ai modifié le code pour corriger cela. Je peux être un peu spacy parfois et je n'avais pas exécuté cette partie du code depuis que je l'ai refactorisé. J'ai vraiment besoin d'écrire des tests unitaires pour cette chose. – aaronasterling
Lorsque vous tuez, obtenez-vous une pile d'appels? Quel est le lieu d'appel? – bstpierre