2010-08-04 15 views
7

J'essaie d'écrire un robot d'indexation multithread.Utilisation de ThreadPool Java

Ma principale classe d'entrée a le code suivant:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null) 
     return; 
exec.execute(new URLCrawler(this, url)); 
} 

Le URLCrawler récupère l'URL spécifiée, analyse le code HTML extrait des liens de celui-ci, et les horaires des liens invisibles retour à la frontière.

Une frontière est une file d'URL non explorée. Le problème est de savoir comment écrire la méthode get(). Si la file d'attente est vide, attendez la fin de URLCrawlers, puis réessayez. Il devrait retourner null seulement quand la file d'attente est vide et il n'y a aucun URLCrawler actuellement actif.

Ma première idée était d'utiliser un AtomicInteger pour compter le nombre actuel de URLCrawlers en cours de travail et un objet auxiliaire pour les appels notifyAll()/wait(). Chaque moteur de balayage au démarrage incrémente le nombre de URLCrawlers en cours et, en fin de session, le décrémente et notifie l'objet qu'il a terminé.

Mais j'ai lu que notify()/notifyAll() et wait() sont des méthodes quelque peu obsolètes pour faire la communication de thread.

Que devrais-je utiliser dans ce modèle de travail? Il est similaire aux producteurs M et N consommateurs, la question est de savoir comment faire face à l'exaustion des producteurs.

Répondre

1

Je pense que l'utilisation de wait/notify est justifiée dans ce cas. Je ne peux pas penser à une façon simple de le faire en utilisant j.c.
Dans une classe, nous appellerons Coordinateur:

private final int numOfCrawlers; 
private int waiting; 

public boolean shouldTryAgain(){ 
    synchronized(this){ 
     waiting++; 
     if(waiting>=numOfCrawlers){ 
      //Everybody is waiting, terminate 
      return false; 
     }else{ 
      wait();//spurious wake up is okay 
      //waked up for whatever reason. Try again 
      waiting--; 
      return true; 
     } 
    } 

public void hasEnqueued(){ 
    synchronized(this){ 
     notifyAll(); 
    } 
} 

puis,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null){ 
     if(!coordinator.shouldTryAgain()){ 
      //all threads are waiting. No possibility of new jobs. 
      return; 
     }else{ 
      //Possible that there are other jobs. Try again 
      continue; 
     } 
    } 
    exec.execute(new URLCrawler(this, url)); 
}//while(true) 
3

Je ne suis pas sûr que je comprends votre conception, mais cela peut être un emploi pour une Semaphore

3

Une option est de faire « frontière » une file d'attente de blocage, donc un thread essayant de « get » de celui-ci va bloquer . Dès que tout autre URLCrawler place des objets dans cette file, tous les autres threads seront automatiquement notifiés (avec l'objet supprimé)

+0

Oui, c'est une solution pour un état stable. Mais comment traiter alors avec la situation quand aucun des URLCrawlers files d'attente des URLs? Avec une file d'attente bloquante, la frontière bloquera à l'infini. –

+0

Dans ce cas, vous pouvez avoir une méthode crawlerDone() sur votre objet frontière qui est appelée chaque fois qu'un UrlCrawler a fini de fonctionner. Cette méthode avec l'approche de compteur que vous avez suggérée, vous pouvez tester (dans votre méthode de frontière) si tous les robots ont fini. Si cela est vrai, get() peut retourner null sans bloquer – naikus

+0

frontier peut être une file d'attente de blocage de capacité fixe. Un bon candidat pour cette capacité est le numberOfCrawlers –

2

Je pense un bloc de construction de base pour votre cas d'utilisation est un « verrou », semblable à CountDownLatch, mais contrairement à CountDownLatch, celui qui permet incrémenter le compte ainsi.

Une interface pour un tel verrou peut être

public interface Latch { 
    public void countDown(); 
    public void countUp(); 
    public void await() throws InterruptedException; 
    public int getCount(); 
} 

Les valeurs légales pour compte seraient 0 et plus. La méthode await() vous permet de bloquer jusqu'à ce que le compte descende à zéro.

Si vous avez un tel loquet, votre cas d'utilisation peut être décrit assez facilement. Je soupçonne aussi que la file d'attente (frontière) peut être éliminée dans cette solution (l'exécuteur en fournit une de toute façon, donc c'est un peu redondant).Je réécrire votre routine principale comme

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers); 
Latch latch = ...; // instantiate a latch 
URL[] initialUrls = ...; 
for (URL url: initialUrls) { 
    executor.execute(new URLCrawler(this, url, latch)); 
} 
// now wait for all crawling tasks to finish 
latch.await(); 

Votre URLCrawler utiliserait le verrou de cette manière:

public class URLCrawler implements Runnable { 
    private final Latch latch; 

    public URLCrawler(..., Latch l) { 
     ... 
     latch = l; 
     latch.countUp(); // increment the count as early as possible 
    } 

    public void run() { 
     try { 
      List<URL> secondaryUrls = crawl(); 
      for (URL url: secondaryUrls) { 
       // submit new tasks directly 
       executor.execute(new URLCrawler(..., latch)); 
      } 
     } finally { 
      // as a last step, decrement the count 
      latch.countDown(); 
     } 
    } 
} 

En ce qui concerne les implémentations de verrouillage, il peut y avoir un certain nombre d'implémentations possibles, allant de celui qui est basé sur wait() et notifyAll(), qui utilise Lock et Condition, pour une implémentation utilisant AbstractQueuedSynchronizer. Je pense que toutes ces implémentations seraient assez simples. Notez que la version wait() - notifyAll() et la version Lock-Condition seraient basées sur une exclusion mutuelle, alors que la version AQS utiliserait CAS (compare-and-swap), et pourrait donc mieux évoluer dans certaines situations.

+0

Votre verrou personnalisé ressemble beaucoup à un sémaphore ... Pourquoi ne pas en utiliser un? – assylias

+0

Oui, il y a des similitudes à coup sûr. Une chose qui manque au sémaphore de vanille est la méthode await() au-dessus de laquelle les termes de sémaphore peuvent bloquer jusqu'à ce que tous les permis soient libérés.On peut probablement créer cela en combinant un sémaphore et un compte à rebours. – sjlee

0

Je voudrais suggérer un AdaptiveExecuter. En fonction d'une valeur caractéristique, vous pouvez choisir de sérialiser ou de paralléliser un thread pour l'exécution. Dans l'exemple ci-dessous, PUID est une chaîne/un objet que je voulais utiliser pour prendre cette décision. Vous pouvez modifier la logique en fonction de votre code. Certaines parties du code sont commentées pour permettre d'autres expériences.

classe AdaptiveExecutor implémente Executor { final Queue tasks = new LinkedBlockingQueue(); Exécutable actif; // ExecutorService threadExecutor = Executors.newCachedThreadPool(); static ExecutorService threadExecutor = Executors.newFixedThreadPool (4);

AdaptiveExecutor() { 
    System.out.println("Initial Queue Size=" + tasks.size()); 
} 

public void execute(final Runnable r) { 
    /* if immediate start is needed do either of below two 
    new Thread(r).start(); 

    try { 
     threadExecutor.execute(r); 
    } catch(RejectedExecutionException rEE) { 
     System.out.println("Thread Rejected " + new Thread(r).getName()); 
    } 

    */ 


    tasks.offer(r); // otherwise, queue them up 
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel. 
    /* 
    tasks.offer(new Runnable() { 
     public void run() { 
      try { 
       r.run(); 
      } finally { 
       scheduleNext(); 
      } 
     } 
    }); 
    */ 
    if ((active == null)&& !tasks.isEmpty()) { 
     active = tasks.poll(); 
     try { 
      threadExecutor.submit(active); 
     } catch (RejectedExecutionException rEE) { 
      System.out.println("Thread Rejected " + new Thread(r).getName()); 
     } 
    } 

    /* 
    if ((active == null)&& !tasks.isEmpty()) { 
     scheduleNext(); 
    } else tasks.offer(r); 
    */ 
    //tasks.offer(r); 

    //System.out.println("Queue Size=" + tasks.size()); 

} 

private void serialize(Thread th) { 
    try { 
     Thread activeThread = new Thread(active); 

     th.wait(200); 
     threadExecutor.submit(th); 
    } catch (InterruptedException iEx) { 

    } 
    /* 
    active=tasks.poll(); 
    System.out.println("active thread is " + active.toString()); 
    threadExecutor.execute(active); 
    */ 
} 

private void parallalize() { 
    if(null!=active) 
     threadExecutor.submit(active); 
} 

protected void scheduleNext(Thread r) { 
    //System.out.println("scheduleNext called") ; 
    if(false==compareKeys(r,new Thread(active))) 
     parallalize(); 
    else serialize(r); 
} 

private boolean compareKeys(Thread r, Thread active) { 
    // TODO: obtain names of threads. If they contain same PUID, serialize them. 
    if(null==active) 
     return true; // first thread should be serialized 
    else return false; //rest all go parallel, unless logic controlls it 
} 

}

2

La question est un peu vieux, mais je pense avoir trouvé une solution simple, travail:

étendez la classe de ThreadPoolExecutor comme ci-dessous. La nouvelle fonctionnalité conserve le nombre de tâches actives (malheureusement, à condition que getActiveCount() ne soit pas fiable). Si taskCount.get() == 0 et qu'il n'y a plus de tâches en attente, cela signifie qu'il n'y a rien à faire et que l'exécuteur s'arrête. Vous avez vos critères de sortie. En outre, si vous créez votre exécuteur testamentaire, mais ne parviennent pas à présenter toutes les tâches, il ne bloquera pas:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor { 

    private final AtomicInteger taskCount = new AtomicInteger(); 

    public CrawlingThreadPoolExecutor() { 
     super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 

     super.beforeExecute(t, r); 
     taskCount.incrementAndGet(); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 

     super.afterExecute(r, t); 
     taskCount.decrementAndGet(); 
     if (getQueue().isEmpty() && taskCount.get() == 0) { 
      shutdown(); 
     } 
    } 
} 

une chose que vous devez faire est de mettre en œuvre votre Runnable d'une manière qu'il conserve référence à Executor que vous utilisez afin de pouvoir soumettre de nouvelles tâches. Voici un faux:

public class MockFetcher implements Runnable { 

    private final String url; 
    private final Executor e; 

    public MockFetcher(final Executor e, final String url) { 
     this.e = e; 
     this.url = url; 
    } 

    @Override 
    public void run() { 
     final List<String> newUrls = new ArrayList<>(); 
     // Parse doc and build url list, and then: 
     for (final String newUrl : newUrls) { 
      e.execute(new MockFetcher(this.e, newUrl)); 
     } 
    } 
}