2010-07-27 8 views
12

Ceci est le code pour créer un thread_group et exécuter toutes les discussions en parallèle:Comment faire boost :: thread_group exécuter un nombre fixe de fils parallèles

boost::thread_group group; 
for (int i = 0; i < 15; ++i) 
    group.create_thread(aFunctionToExecute); 
group.join_all(); 

Ce code exécute toutes les discussions à la fois. Ce que je veux faire est de les exécuter tous sauf 4 maximum en parallèle. Quand on est terminé, un autre est exécuté jusqu'à ce qu'il n'y ait plus rien à exécuter.

Répondre

3

Une autre solution, plus efficace, consisterait à avoir chaque rappel de thread dans le thread primaire lorsqu'ils sont terminés, et le gestionnaire du thread primaire pourrait lancer un nouveau thread à chaque fois. Cela empêche les appels répétitifs à timed_join, car le thread primaire ne fera rien tant que le rappel n'est pas déclenché.

+1

Enfin finir avec quelque chose comme ceci: J'ai un threadpool dans lequel je enregistre tous les travaux. Ensuite, je crée les n threads et passe en argument à chaque thread le pool de threads. Chaque thread vérifie s'il reste des jobs. Si oui, il suffit d'avoir un travail à exécuter. Sinon, le fil se termine. De cette façon, nous créons simplement n threads et non un thread par tâche (un travail se termine, un nouveau thread est créé). –

0

J'ai quelque chose comme ceci:

boost::mutex mutex_; 
    boost::condition_variable condition_; 
    const size_t throttle_; 
    size_t size_; 
    bool wait_; 
    template <typename Env, class F> 
    void eval_(const Env &env, const F &f) { 
     { 
      boost::unique_lock<boost::mutex> lock(mutex_); 
      size_ = std::min(size_+1, throttle_); 
      while (throttle_ <= size_) condition_.wait(lock); 
     } 
     f.eval(env); 
     { 
      boost::lock_guard<boost::mutex> lock(mutex_); 
      --size_; 
     } 
     condition_.notify_one(); 
    } 
0

Je pense que vous êtes à la recherche d'une mise en œuvre thread_pool, qui est disponible here.

De plus, j'ai remarqué que si vous créez un vecteur de std :: future et stocke des futures de std :: async_tasks et que vous n'avez aucun code bloquant dans la fonction passée au thread, VS2013 (atleast from ce que je peux confirmer) lancera exactement le nombre approprié de threads que votre machine peut gérer. Il réutilise les threads une fois créés.

0

J'ai créé ma propre interface simplifiée de boost::thread_group pour faire ce travail:

class ThreadGroup : public boost::noncopyable 
{ 
    private: 
     boost::thread_group  group; 
     std::size_t    maxSize; 
     float      sleepStart; 
     float      sleepCoef; 
     float      sleepMax; 
     std::set<boost::thread*> running; 

    public: 
     ThreadGroup(std::size_t max_size = 0, 
        float max_sleeping_time = 1.0f, 
        float sleeping_time_coef = 1.5f, 
        float sleeping_time_start = 0.001f) : 
      boost::noncopyable(), 
      group(), 
      maxSize(max_size), 
      sleepStart(sleeping_time_start), 
      sleepCoef(sleeping_time_coef), 
      sleepMax(max_sleeping_time), 
      running() 
     { 
      if(max_size == 0) 
       this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u); 
      assert(max_sleeping_time >= sleeping_time_start); 
      assert(sleeping_time_start > 0.0f); 
      assert(sleeping_time_coef > 1.0f); 
     } 

     ~ThreadGroup() 
     { 
      this->joinAll(); 
     } 

     template<typename F> boost::thread* createThread(F f) 
     { 
      float sleeping_time = this->sleepStart; 
      while(this->running.size() >= this->maxSize) 
      { 
       for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();) 
       { 
        const std::set<boost::thread*>::iterator jt = it++; 
        if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time)))) 
         running.erase(jt); 
       } 
       if(sleeping_time < this->sleepMax) 
       { 
        sleeping_time *= this->sleepCoef; 
        if(sleeping_time > this->sleepMax) 
         sleeping_time = this->sleepMax; 
       } 
      } 
      return *this->running.insert(this->group.create_thread(f)).first; 
     } 

     void joinAll() 
     { 
      this->group.join_all(); 
     } 

     void interruptAll() 
     { 
#ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS 
      this->group.interrupt_all(); 
#endif 
     } 

     std::size_t size() const 
     { 
      return this->group.size(); 
     } 
    }; 

Voici un exemple d'utilisation, très similaire à boost::thread_group avec la principale différence que la création du fil est un point d'attente:

{ 
    ThreadGroup group(4); 
    for(int i = 0; i < 15; ++i) 
    group.createThread(aFunctionToExecute); 
} // join all at destruction