2009-09-30 10 views
2

J'ai besoin de manipuler deux files d'attente et atomiquement ne suis pas sûr quelle est la stratégie de synchronisation correcte: Ceci est ce que je voulais:Deux BlockingQueue - impasse

public class transfer { 

    BlockingQueue firstQ; 
    BlockingQueue secondQ; 

    public moveToSecond() { 
     synchronized (this){ 
      Object a = firstQ.take(); 
      secondQ.put(a) 
     } 
    } 

    public moveToFirst() { 
     synchronized(this) { 
      Object a = secondQ.take(); 
      firstQ.put(a); 
     } 
    } 
} 

Est-ce le bon modèle? Dans la méthode moveToSecond(), si firstQ est vide, la méthode attendra sur firstQ.take(), mais elle conserve le verrou sur cet objet. Cela empêchera moveToFirst() d'avoir une chance de s'exécuter.

Je suis confus au sujet de la libération de verrou pendant une attente - Le fil libère-t-il tous les verrous [à la fois ceci et BlockedQUeue lock?]? Quel est le modèle correct pour fournir l'atomicité traitant de plusieurs files d'attente de blocage?

Répondre

0

Vous devez utiliser le mécanisme de verrouillage de java.util.concurrency, comme ceci:

Lock lock = new ReentrantLock(); 
.... 
lock.lock(); 
try { 
    secondQ.put(firstQ.take()); 
} finally { 
    lock.unlock(); 
} 

Faites la même chose pour firstQ.put (secondQ.take()), en utilisant le même objet de verrouillage.

Il n'est plus nécessaire d'utiliser les méthodes wait/notify de bas niveau dans la classe Object, sauf si vous écrivez de nouvelles primitives de concurrence.

+1

Le verrouillage a été recommandé en Java 1.5 sur wait() pour des raisons d'efficacité. En Java 1.6, ils ont presque le même niveau d'efficacité. Les futures optimisations de VM rendront probablement les implémentations wait/notify plus rapides. –

+0

Mais le verrouillage est beaucoup plus facile à utiliser correctement. – JesperE

+0

Comment cela résout-il le problème de l'OP de la première file d'attente étant vide? Le code bloquera toujours indéfiniment sur la méthode take(). – Adamski

0

Dans votre code, alors que le fil est bloqué sur BlockingQueue.take(), il s'accroche au verrouillage this. Le verrou n'est libéré que lorsque le code quitte le bloc synchronisé ou que this.wait() est appelé.

Ici, je suppose que moveToFirst() et moveToSecond() devraient bloquer, et que votre classe contrôle tous les accès aux files d'attente.

private final BlockingQueue<Object> firstQ = new LinkedBlockingQueue(); 
private final Semaphore firstSignal = new Semaphore(0); 
private final BlockingQueue<Object> secondQ = LinkedBlockingQueue(); 
private final Semaphore secondSignal = new Semaphore(0); 

private final Object monitor = new Object(); 

public void moveToSecond() { 
    int moved = 0; 
    while (moved == 0) { 

    // bock until someone adds to the queue 
    firstSignal.aquire(); 

    // attempt to move an item from one queue to another atomically 
    synchronized (monitor) { 
     moved = firstQ.drainTo(secondQ, 1); 
    } 
    } 
} 

public void putInFirst(Object object) { 
    firstQ.put(object); 

    // notify any blocking threads that the queue has an item 
    firstSignal.release(); 
} 

Vous auriez un code similaire pour moveToFirst() et putInSecond(). Le while n'est nécessaire que si un autre code peut supprimer des éléments de la file d'attente. Si vous voulez que la méthode qui supprime dans la file d'attente pour attendre les mouvements en attente, il devrait acquérir un permis du sémaphore, et le sémaphore devrait être créé comme un bon sémaphore, donc le premier thread à appeler aquire sera publié en premier:

firstSignal = new Semaphore(0, true); 

Si vous ne voulez pas vous bloquer moveToFirst() avez quelques options

  1. ont la méthode ne faire son travail dans un Runnable envoyé à un Executor
  2. Passer un délai d'attente pour moveToFirst() et utiliser BlockingQueue.poll(int, TimeUnit)
  3. Utilisez BlockingQueue.drainTo(secondQ, 1) et modifiez moveToFirst() pour renvoyer un booléen pour indiquer s'il a réussi.

Pour les trois options ci-dessus, vous n'avez pas besoin du sémaphore.

Enfin, je doute de la nécessité de rendre le mouvement atomique. Si plusieurs threads ajoutent ou suppriment des files d'attente, une file d'attente d'observation ne serait pas capable de dire si moveToFirst() était atomique.

2

Vous utilisez l'approche correcte en utilisant un mutex commun pour synchroniser entre les deux files d'attente.Cependant, pour éviter la situation que vous décrivez avec la première file d'attente étant vide, je suggérerais de réimplémenter moveToFirst() et moveToSecond() pour utiliser poll() plutôt que take(); par exemple.

public void boolean moveToFirst() { 
    // Synchronize on simple mutex; could use a Lock here but probably 
    // not worth the extra dev. effort. 
    synchronzied(queueLock) { 
    boolean success; 

    // Will return immediately, returning null if the queue is empty. 
    Object o = firstQ.poll(); 

    if (o != null) { 
     // Put could block if the queue is full. If you're using a bounded 
     // queue you could use add(Object) instead to avoid any blocking but 
     // you would need to handle the exception somehow. 
     secondQ.put(o); 
     success = true; 
    } else { 
     success = false; 
    } 
    } 

    return success; 
} 
1

Une autre condition d'échec que vous n'avez pas mentionné est si firstQ n'est pas vide, mais secondQ est pleine, l'article sera retiré de firstQ mais il n'y aura pas de place pour le mettre. La seule façon correcte consiste donc à utiliser poll et offre avec les timeouts et le code pour retourner les choses telles qu'elles étaient avant chaque échec (important!), Puis réessayer après un temps aléatoire jusqu'à ce que les deux pollen et offre réussissent.

Ceci est une approche optimiste; efficace en fonctionnement normal mais très inefficace lorsque les blocages sont fréquents (la latence moyenne dépend du délai choisi)