2010-10-13 74 views
4

J'ai implémenté un thread-producteur de base thread (thread 1 = producteur, thread 2 = consommateur) en utilisant des threads et des conditions Boost. Je suis coincé dans wait() indéfiniment assez souvent. Je ne peux pas vraiment voir ce qui pourrait être mauvais ici. Ci-dessous quelques pseudo-code:Boost condition de blocage en utilisant wait() dans le code producteur-consommateur

// main class 
class Main { 
public: 
    void AddToQueue(...someData...) 
    { 
    boost::mutex::scoped_lock lock(m_mutex); 
    m_queue.push_back(new QueueItem(...someData...)); 
    m_cond.notify_one(); 
    } 

    void RemoveQueuedItem(...someCond...) 
    { 
    // i'm wondering if this could cause the trouble? 
    boost::mutex::scoped_lock lock(m_mutex); 
    // erase a item matching condition (some code not shown, 
    // but should be fairly self-explanatory -- IsMatch() 
    // simply looks at a flag of QueueItem 
    m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(), 
     boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end()); 
    } 

    friend void WorkerThread(Main* m); 
private:  
    boost::ptr_deque<QueueItem> m_queue; 
    boost::mutex m_mutex; 
    boost::condition m_cond; 
}; 

// worker thread 
void WorkerThread(Main* m) 
{ 
    typedef boost::ptr_deque<QueueItem>::auto_type RelType; 
    RelType queueItem; 

    while(!shutDown) { 
    { // begin mutex scope 
     boost::mutex::scoped_lock lock(m->m_mutex); 
     while(m->m_queue.empty()) { 
     m->m_cond.wait(lock); // <- stuck here forever quite often! 
     } 
     queueItem = m->m_queue->pop_front(); // pop & take ptr ownership 
    } // end mutex scope 

    // ... do stuff with queueItem 
    // ... 
    // ... queueItem is deleted when it leaves scope & we loop around 
    } 
} 

Quelques informations supplémentaires:

  • En utilisant Boost v1.44
  • problème se produit dans Linux et Android; Je ne suis pas encore sûr si cela se produit dans Windows

Des idées?

MISE À JOUR: Je crois J'ai isolé la question. Je mettrai à jour plus loin une fois confirmé, qui, espérons-le, sera demain.

MISE À JOUR 2: Il s'avère qu'il n'y a pas de problème dans le code décrit ci-dessus. Je dépendais d'une API sous-jacente pour AddToQueue() - lors du traitement des données dans le thread de travail & le remettant à l'API, il avait un bug circulaire où il appelait AddToQueue() à nouveau ... qui est maintenant corrigé ;-)

+0

Votre code ci-dessus n'a pas la partie où vous attendez le sur la variable conditionnelle. Collez-le aussi. – wilx

+0

Dans mon thread de travail (consommateur) ci-dessus, vous verrez un moment (queue_is_empty()) {cond.wait (lock); } Est-ce ce que vous voulez dire? – NuSkooler

Répondre

2

J'ai fait quelque chose de similaire récemment même si le mien utilise la file d'attente STL. Voyez si vous pouvez choisir de ma mise en œuvre. Comme le dit wilx, vous devez attendre à la condition. Mon implémentation a une limite maximum sur les éléments de la file d'attente et j'utilise ça pour attendre que le mutex/guard soit libéré. À l'origine, j'ai fait cela sur Windows avec la possibilité d'utiliser les sections Mutex ou Critical à l'esprit, d'où le paramètre de modèle que vous pouvez supprimer et utiliser directement boost::mutex s'il le simplifie pour vous.

#include <queue> 
#include "Message.h" 
#include <boost/thread/locks.hpp> 
#include <boost/thread/condition.hpp> 

template <typename T> class Queue : private boost::noncopyable 
{ 
public: 
    // constructor binds the condition object to the Q mutex 
    Queue(T & mutex, size_t max_size) : m_max_size(max_size), m_mutex(mutex){} 

    // writes messages to end of Q 
    void put(const Message & msg) 
    { 
    // Lock mutex to ensure exclusive access to Q 
    boost::unique_lock<T> guard(m_mutex); 

    // while Q is full, sleep waiting until something is taken off of it 
    while (m_queue.size() == m_max_size) 
    { 
     cond.wait(guard); 
    } 

    // ok, room on the queue. 
    // Add the message to the queue 
    m_queue.push(msg); 

    // Indicate so data can be ready from Q 
    cond.notify_one(); 
    } 

    // Read message from front of Q. Message is removed from the Q 
    Message get(void) 
    { 
    // Lock mutex to ensure exclusive access to Q 
    boost::unique_lock<T> guard(m_mutex); 

    // If Q is empty, sleep waiting for something to be put onto it 
    while (m_queue.empty()) 
    { 
     cond.wait(guard); 
    } 

    // Q not empty anymore, read the value 
    Message msg = m_queue.front(); 

    // Remove it from the queue 
    m_queue.pop(); 

    // Signal so more data can be added to Q 
    cond.notify_one(); 

    return msg; 
    } 

    size_t max_size(void) const 
    { 
    return m_max_size; 
    } 


private: 
    const size_t m_max_size; 
    T & m_mutex; 
    std::queue<Message> m_queue; 
    boost::condition_variable_any cond; 
}; 

De cette façon, vous pouvez partager la file d'attente entre le producteur et le consommateur. Exemple d'utilisation

boost::mutex mutex; 

Queue<boost::mutex> q(mutex, 100); 

boost::thread_group threads; 

threads.create_thread(Producer<boost::mutex>(q)); 
threads.create_thread(Consumer<boost::mutex>(q)); 

threads.join_all(); 

avec le producteur/consommateur défini comme ci-dessous

template <typename T> class Producer 
{ 
public: 
    // Queue passed in 
    explicit Producer(Queue<T> &q) : m_queue(q) {} 

    void operator()() 
    { 
    } 
} 
+0

Il s'avère que je n'ai pas eu un bug dans la partie producteur-consommateur de mon code (publiera les détails dans le message principal dans un peu). J'apprécie vraiment votre implémentation, je pense que je pourrais considérer certains de mes semblables comme similaires. – NuSkooler

+0

Cheers. Content de faire une contribution. – dubnde

0
m->m_cond.wait(); // <- stuck here forever quite often! 

devrait être:

m->m_cond.wait(lock); 

Vous morts verrouillé votre Classs parce que vous aviez encore le mutex accquired mais vous attendaient. Toute autre méthode veut acquérir le même mutex et attendre que votre ouvrier ne libère jamais le mutex.

+0

Désolé, c'est juste une faute de frappe dans mon pseudo code.Je l'ai corrigé ci-dessus. Le code réel a été implémenté en attente de verrouillage réel (boost :: mutex :: scoped_lock <>). – NuSkooler

+0

Ok alors par curiosité s'il vous plaît poster le code pour ajouter quelque chose à la file d'attente. – Vinzenz

+0

Il est déjà là en tant que file d'attente(). J'ai depuis renommé la fonction dans le pseudo code ci-dessus à AddToQueue() pour plus de clarté. – NuSkooler