2010-06-25 16 views
6

je vais avoir des problèmes de blocage avec ce morceau de code:Python multiprocessing.Queue sur les interblocages PUT et obtenir


def _entropy_split_parallel(data_train, answers_train, weights): 
    CPUS = 1 #multiprocessing.cpu_count() 
    NUMBER_TASKS = len(data_train[0]) 
    processes = [] 

    multi_list = zip(data_train, answers_train, weights) 

    task_queue = multiprocessing.Queue() 
    done_queue = multiprocessing.Queue() 

    for feature_index in xrange(NUMBER_TASKS): 
     task_queue.put(feature_index) 

    for i in xrange(CPUS): 
     process = multiprocessing.Process(target=_worker, 
       args=(multi_list, task_queue, done_queue)) 
     processes.append(process) 
     process.start() 

    min_entropy = None 
    best_feature = None 
    best_split = None 
    for i in xrange(NUMBER_TASKS): 
     entropy, feature, split = done_queue.get() 
     if (entropy < min_entropy or min_entropy == None) and entropy != None: 
      best_feature = feature 
      best_split = split 

    for i in xrange(CPUS): 
     task_queue.put('STOP') 

    for process in processes: 
     process.join() 

    return best_feature, best_split 


def _worker(multi_list, task_queue, done_queue): 
    feature_index = task_queue.get() 
    while feature_index != 'STOP': 
     result = _entropy_split3(multi_list, feature_index) 
     done_queue.put(result) 
     feature_index = task_queue.get() 

Quand je lance mon programme, il fonctionne très bien pour plusieurs courses à travers _entropy_split_parallel, mais finalement les blocages. Le processus parent bloque le done_queue.get() et le processus de travail bloque le done_queue.put(). Comme la file d'attente est toujours vide lorsque cela se produit, le blocage sur get est attendu. Ce que je ne comprends pas, c'est pourquoi le travailleur bloque put, puisque la file d'attente n'est évidemment pas pleine (c'est vide!). J'ai essayé les arguments de mot-clé block et timeout, mais obtiens le même résultat. J'utilise le backport multitraitement, puisque je suis coincé avec Python 2.5.


EDIT: Il semble que je reçois également des problèmes de blocage avec l'un des exemples fournis avec le module multitraitement. C'est le troisième exemple à partir du bas here. La superconduite ne semble se produire que si j'appelle la méthode de test plusieurs fois. Par exemple, en changeant le fond du script à ceci:


if __name__ == '__main__': 
    freeze_support() 
    for x in xrange(1000): 
     test() 

EDIT: Je sais que c'est une vieille question, mais des tests montrent que ce ne soit plus un problème sur Windows avec Python 2.7. Je vais essayer Linux et rendre compte.

Répondre

0

Ce problème a disparu avec les versions plus récentes de Python, donc je suppose que c'était un problème avec le backport. De toute façon, ce n'est plus un problème.

4

Je pense que le problème est le thread parent rejoignant un thread fils auquel il a passé une file d'attente. Ceci est discuté le programming guidelines section du module de multi-traitement. Quoi qu'il en soit, j'ai rencontré le même symptôme que celui que vous avez décrit, et quand j'ai refaçonné ma logique pour que le thread principal ne rejoigne pas les threads fils, il n'y avait pas de blocage. Ma logique refacturée impliquait de connaître le nombre d'éléments que je devrais obtenir à partir des résultats ou de la file d'attente «terminée» (qui peut être prédite en fonction du nombre de threads enfants ou du nombre d'éléments dans la file d'attente, etc.). infiniment jusqu'à ce que tout cela ait été recueilli.

« Toy » illustration de la logique:

num_items_expected = figure_it_out(work_queue, num_threads) 
items_received = [] 
while len(items_received) < num_items_expected: 
    items_received.append(done_queue.get()) 
    time.sleep(5) 

La logique ci-dessus évite la nécessité pour le fil de parent pour rejoindre le thread enfant, permet encore le fil de parent pour bloquer jusqu'à ce que tous les enfants sont faits. Cette approche a évité mes problèmes d'impasse.

+0

Je pense que toutes les files d'attente doivent être vides lorsque les processus sont joints, cela ne devrait donc pas poser de problème. De plus, le processus maître est bloqué sur place, plutôt que de rejoindre. J'ai juste mis à jour Python (j'étais coincé avec une ancienne version), donc je vais tester cela à nouveau. – ajduff574

+0

@ajduff dans mon cas, l'impasse ne s'est pas produite sur la jointure, mais la mise aussi, sauf que la mise était dans le fil de l'enfant. De plus, dans mon cas, la file d'attente était vide. Donc, je pense que cela vaut la peine d'être photographié (c'est-à-dire, en évitant le thread principal qui rejoint les fils de l'enfant) dans votre cas aussi. – Jeet