2010-05-28 3 views
4

J'ai un cas où de nombreux threads tous génèrent simultanément des données qui sont finalement écrites sur un long flux de fichier. Je dois en quelque sorte sérialiser ces écritures pour que le flux soit écrit dans le bon ordre.Terme standard pour un tampon de réorganisation d'E/S de thread?

ie, I ont une file d'attente d'entrée de 2048 emplois j ..j n, dont chacun produit un bloc de données i o . Les travaux s'exécutent en parallèle sur, disons, huit threads, mais les blocs de sortie doivent apparaître dans le flux dans le même ordre que les blocs d'entrée correspondants — le fichier de sortie doit être dans l'ordre o o ...

la solution à c'est assez évident: je besoin d'une sorte de tampon qui accumule et écrit les blocs de sortie dans l'ordre correct, semblable à un tampon de réordonnancement CPU dans Tomasulo's algorithm, ou à la façon TCP réassemble les paquets en panne avant de les transmettre à la couche application. Avant de passer au code, je voudrais faire une rapide recherche documentaire pour voir s'il y a des papiers qui ont résolu ce problème d'une manière particulièrement intelligente ou efficace, car j'ai de sévères contraintes en temps réel et en mémoire. Je n'arrive pas à trouver des documents décrivant cela; une recherche Scholar sur chaque permutation de [threads, concurrent, reorder buffer, réassemblage, io, sérialiser] n'a rien donné d'utile. Je me sens comme si je ne devais pas chercher les bons termes.

Existe-t-il un nom ou un mot-clé académique commun pour ce type de modèle que je peux rechercher?

Répondre

0

En fait, vous ne devriez pas avoir besoin d'accumuler les morceaux. La plupart des systèmes d'exploitation et des langues fournissent une abstraction de fichier à accès aléatoire qui permettrait à chaque thread d'écrire indépendamment ses données de sortie à la position correcte dans le fichier sans affecter les données de sortie de l'un des autres threads. Ou écrivez-vous vraiment à un fichier de sortie série comme une socket?

+0

Véritable série - un chiffrement de flux. – Crashworks

+0

Votre solution ne fonctionne que si la longueur des enregistrements de sortie est connue avant la fin du traitement. –

0

Je n'utiliserais pas du tout un tampon réorganisable, personnellement. Je créerais un objet 'job' par tâche et, en fonction de votre environnement, j'utiliserais le passage de message ou les mutex pour recevoir les données complétées de chaque travail dans l'ordre. Si le travail suivant n'est pas terminé, votre processus d'écriture attend jusqu'à ce que ce soit le cas.

+0

J'ai peur de ne pas suivre ce que vous voulez dire. Voulez-vous dire que je devrais avoir (m) beaucoup de mutex, un pour chaque travail, et que l'écrivain devrait attendre sur chacun d'eux dans l'ordre croissant? Le problème avec ceci est que je n'ai que de la mémoire pour contenir une vingtaine de travaux à la fois, et si je rencontre le cas où la fenêtre courante se termine dans l'ordre inverse, cela laissera plusieurs threads inactifs jusqu'à la "tête" on termine. – Crashworks

+0

C'est ce que je suggérais, oui. Je ne pense pas que toute autre solution fera mieux si les tâches se déroulent dans l'ordre inverse, à l'exception de la suggestion de Steve, si vos enregistrements sont de longueur connue ou si les résultats sont mis en cache sur le disque. –

0

Je voudrais utiliser un tampon d'anneau qui a la même longueur que le nombre de fils que vous utilisez. Le ringbuffer aurait également le même nombre de mutex.

Le rinbuffer doit également connaître le id du dernier segment qu'il a écrit dans le fichier. C'est l'équivalent de l'index 0 de votre ringbuffer. A l'ajout au ringbuffer, vous vérifiez si vous pouvez écrire, c'est-à-dire que l'index 0 est défini, vous pouvez alors écrire plus d'un morceau à la fois dans le fichier.

Si l'index 0 n'est pas défini, bloquez simplement le thread en attente. - Vous pouvez également avoir un ringbuffer 2 à 3 fois plus long que votre nombre de threads et verrouiller uniquement lorsque cela est approprié, c'est-à-dire: lorsque suffisamment de travaux sont terminés, le buffer a été lancé.

Ne pas oublier de mettre à jour le dernier morceau écrit difficile;)

Vous pouvez également utiliser le double buffering lors de l'écriture dans le fichier.

0

Avoir la file d'attente de sortie contient contrats à terme plutôt que les données réelles. Lorsque vous récupérez un élément de la file d'attente d'entrée, publiez immédiatement le futur correspondant dans la file d'attente de sortie (en veillant à conserver cet ordre --- voir ci-dessous). Lorsque le thread de travail a traité l'élément, il peut ensuite définir la valeur sur le futur. Le thread de sortie peut lire chaque futur de la file d'attente et bloquer jusqu'à ce que ce futur soit prêt. Si les versions ultérieures deviennent prêtes tôt, cela n'affecte en rien le fil de sortie, à condition que les contrats à terme soient en ordre.

Il existe deux façons de s'assurer que les contrats à terme sur la file d'attente de sortie sont dans le bon ordre. La première consiste à utiliser un seul mutex pour lire dans la file d'attente d'entrée et écrire dans la file d'attente de sortie. Chaque thread verrouille le mutex, prend un élément de la file d'attente d'entrée, poste le futur dans la file d'attente de sortie et libère le mutex. Le second est d'avoir un seul thread maître qui lit à partir de la file d'attente d'entrée, publie le futur dans la file d'attente de sortie, puis transmet l'élément à un thread de travail à exécuter.

en C++ avec un seul mutex protéger les files d'attente ceci ressemblera:

#include <thread> 
#include <mutex> 
#include <future> 

struct work_data{}; 
struct result_data{}; 

std::mutex queue_mutex; 
std::queue<work_data> input_queue; 
std::queue<std::future<result_data> > output_queue; 

result_data process(work_data const&); // do the actual work 

void worker_thread() 
{ 
    for(;;) // substitute an appropriate termination condition 
    { 
     std::promise<result_data> p; 
     work_data data; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(input_queue.empty()) 
      { 
       continue; 
      } 
      data=input_queue.front(); 
      input_queue.pop(); 
      std::promise<result_data> item_promise; 
      output_queue.push(item_promise.get_future()); 
      p=std::move(item_promise); 
     } 
     p.set_value(process(data)); 
    } 
} 

void write(result_data const&); // write the result to the output stream 

void output_thread() 
{ 
    for(;;) // or whatever termination condition 
    { 
     std::future<result_data> f; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(output_queue.empty()) 
      { 
       continue; 
      } 
      f=std::move(output_queue.front()); 
      output_queue.pop(); 
     } 
     write(f.get()); 
    } 
}