2010-02-19 7 views
2

Je crée un script python threadé qui contient une collection de fichiers placés dans une file d'attente, puis un nombre inconnu de threads (la valeur par défaut est 3) pour lancer le téléchargement. Lorsque chaque thread est terminé, il met à jour stdout avec l'état de la file d'attente et un pourcentage. Tous les fichiers sont en cours de téléchargement mais les informations d'état sont erronées sur le 3ème fil et je ne sais pas pourquoi. J'ai envisagé de créer une file d'attente de travail à utiliser pour le calcul, mais je ne pense pas que je devrais/que cela aurait de l'importance. Quelqu'un pourrait-il me diriger dans la bonne direction ici?Problème de thread/file d'attente Python

download_queue = queue.Queue() 

class Downloader(threading.Thread): 
    def __init__(self,work_queue): 
     super().__init__() 
     self.current_job = 0 
     self.work_queue = work_queue 
     self.queue_size = work_queue.qsize() 

    def run(self): 
     while self.work_queue.qsize() > 0: 
      url = self.work_queue.get(True) 
      system_call = "wget -nc -q {0} -O {1}".format(url,local_file) 
      os.system(system_call) 
      self.current_job = int(self.queue_size) - int(self.work_queue.qsize()) 
      self.percent = (self.current_job/self.queue_size) * 100 
      sys.stdout.flush() 
      status = "\rDownloading " + url.split('/')[-1] + " [status: " + str(self.current_job) + "/" + str(self.queue_size) + ", " + str(round(self.percent,2)) + "%]" 
     finally: 
      self.work_queue.task_done() 
def main: 
    if download_queue.qsize() > 0: 
     if options.active_downloads: 
      active_downloads = options.active_downloads 
     else: 
      active_downloads = 3 
     for x in range(active_downloads): 
      downloader = Downloader(download_queue) 
      downloader.start() 
     download_queue.join() 
+2

Ce code est-il réel? Où imprimez-vous le message d'état? Quelle version de python supporte 'while ... finally'? –

+0

Voir aussi http://stackoverflow.com/questions/1965213/file-downloading-using-python-with-threads –

Répondre

4

Vous ne pouvez pas vérifier la taille de file d'attente dans une instruction, puis .get() de la file d'attente dans l'autre. En attendant, le monde entier a peut-être changé. L'appel de méthode .get() est l'opération atomique unique que vous devez appeler. Si elle soulève Empty ou des blocs, la file d'attente est vide.

Vos threads peuvent écraser la sortie de l'autre. Je voudrais avoir un autre thread avec une file d'attente d'entrée dont le seul travail est d'imprimer les éléments de la file d'attente à stdout. Il peut également compter le nombre d'éléments terminés et produire des informations d'état.

J'ai aussi tendance à ne pas sous-classe Thread, mais seulement l'offre une instance Thread simple avec un paramètre target= et .start() le fil.

en fonction de votre réponse, essayez ceci:

download_queue = queue.Queue() 


class Downloader(threading.Thread): 
    def __init__(self,work_queue, original_size): 
     super().__init__() 
     self.current_job = 0 
     self.work_queue = work_queue 
     self.queue_size = original_size 

    def run(self): 
     while True: 
      try: 
       url = self.work_queue.get(False) 
       system_call = "wget -nc -q {0} -O {1}".format(url,local_file) 
       os.system(system_call) 
       # the following code is questionable. By the time we get here, 
       # many other items may have been taken off the queue. 
       self.current_job = int(self.queue_size) - int(self.work_queue.qsize()) 
       self.percent = (self.current_job/self.queue_size) * 100 
       sys.stdout.flush() 
       status = ("\rDownloading " + url.split('/')[-1] + 
          " [status: " + str(self.current_job) + 
          "/" + str(self.queue_size) + ", " + 
          str(round(self.percent,2)) + "%]")    
      except queue.Empty: 
       pass 
      finally: 
       self.work_queue.task_done() 




def main: 
    if download_queue.qsize() > 0: 
     original_size = download_queue.qsize() 
     if options.active_downloads: 
      active_downloads = options.active_downloads 
     else: 
      active_downloads = 3 
     for x in range(active_downloads): 
      downloader = Downloader(download_queue, original_size) 
      downloader.start() 
     download_queue.join() 
+0

Je suis conscient que les threads écraseront la sortie de l'autre, c'est bien comme c'est supposé le faire. Je veux seulement montrer le dernier fichier qui a été réglé pour être téléchargé et quel est le nombre comparé à la valeur initiale de la taille de la file d'attente. Actuellement, la queuesize est incorrecte sur le 3ème thread (si vous utilisez des valeurs par défaut); il en montre 2 de moins que les deux premiers. Par exemple, voici à quoi ressemble chacune des lignes d'état une fois imprimées: Téléchargement du fichier 1.txt [status: 1/10, 10%] Téléchargement du fichier 2.txt [status: 2/10, 10%] Téléchargement fichier 3.txt [status: 3/8, 37.5%] – MRR0GERS

+0

Oui, au moment où le 3eme worker est démarré, les 2 autres ont traité un item en dehors de la file d'attente ... Vous n'affichez pas le code qui place les éléments dans la file d'attente dans cet extrait, mais c'est probablement là que votre nombre total devrait provenir. Ou simplement stocker la taille totale de la file d'attente AVANT de démarrer des threads, et ne pas le lire dans un thread. –

+0

J'ai écrit le script dans Python3 et cela fonctionne à l'exception de quelques choses. Merci pour votre contribution, je vais modifier mon code avec vos suggestions quand je rentre à la maison plus tard ce soir. – MRR0GERS

2

Si vous souhaitez utiliser le module multiprocessing, il comprend un imap_unordered parallèle très agréable, ce qui réduirait votre problème à la très élégante:

import multiprocessing, sys 

class ParallelDownload: 
    def __init__(self, urls, processcount=3): 
     self.total_items = len(urls) 
     self.pool = multiprocessing.Pool(processcount) 
     for n, status in enumerate(self.pool.imap_unordered(self.download, urls)): 
      stats = (n, self.total_items, n/self.total_items) 
      sys.stdout.write(status + " [%d/%d = %0.2f %%]\n"%stats) 


    def download(self, url): 
     system_call = "wget -nc -q {0} -O {1}".format(url, local_file) 
     os.system(system_call) 
     status = "\rDownloaded " + url.split('/')[-1] 
     return status