2010-12-10 38 views
7

J'essaie de trouver un moyen de faire un Lock Free OU non-bloquant pour faire un Ring Ringer pour consommateur simple/consommateur unique qui va écraser les données les plus anciennes dans le tampon. J'ai lu beaucoup d'algorithmes sans verrous qui fonctionnent quand vous "renvoyez faux" si le tampon est plein - c'est-à-dire, n'ajoutez pas; mais je ne trouve même pas de pseudo-code qui parle de la façon de le faire lorsque vous avez besoin d'écraser les données les plus anciennes. J'utilise GCC 4.1.2 (restriction au travail, je ne peux pas mettre à jour la version ...) et j'ai les bibliothèques Boost, et dans le passé j'ai fait mon propre type de variable Atomic < T> qui suit assez proche de la spécification upcomming (ce n'est pas parfait, mais il est thread-safe et fait ce dont j'ai besoin).C/C++ Mémoire tampon sans verrouillage (ou non bloquante) qui écrase les données les plus anciennes?

Quand j'y ai pensé, je me suis dit que l'utilisation de ces atomiques devrait vraiment régler le problème. certains psuedo code rugueux à ce que je pensais:

template< typename T , unsigned int Size> 
class RingBuffer { 
private: 
Atomic<unsigned int> readIndex; 
Atomic<unsigned int> writeIndex; 
enum Capacity { size = Size }; 
T* buf; 

unsigned int getNextIndex(unsigned int i) 
{ 
return (i + 1) % size; 
} 

public: 
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 } 
~RingBuffer() { //delete data } 
void produce(const T& t) 
{ 
if(writeIndex == getNextIndex(readIndex)) //1 
{ 
    readIndex = getNextIndex(readIndex); //2 
    } 
    buf[writeIndex] = t; 
    writeIndex = getNextIndex(writeIndex); //3 
} 

bool consume(T& t) 
{ 
    if(readIndex == writeIndex) //4 
    return false; 
    t = buf[readIndex]; 
    readIndex = getNexIndex(readIndex); //5 
    return true; 
} 

}; 

Pour autant que je peux dire, il n'y a pas de situation d'impasse ici, donc nous sommes en sécurité de ce (Si ma mise en œuvre ci-dessus est mal, même sur son pseudo-code leve, la critique constructive est toujours appréciée). Cependant, la condition de course BIG que je peux trouver est:

laisse supposer que le tampon est plein. c'est-à-dire, writeIndex +1 = readIndex; (1) se produit, tout comme la consommation est appelée. et est vrai (4) est faux, donc nous passons à lire à partir du tampon (5) se produit, et le readIndex est avancé un (donc il y a, en fait, l'espace dans le tampon (2) se produit, avançant readIndex En fait, c'est un problème classique de l'écrivain qui doit modifier le lecteur, provoquant une condition de concurrence.Sans bloquer réellement la liste entière chaque fois que j'y accède, je ne peux pas penser à un moyen de empêcher que cela se produise. Qu'est-ce que je manque ??

+0

Je ne pense pas qu'un algorithme sans verrou existe pour cela. La raison pour laquelle les modèles producteurs/consommateurs fonctionnent sans verrouillage est parce que les producteurs et les consommateurs n'ont jamais besoin de toucher les mêmes données. Mais ce que vous proposez intimement lie les deux ensemble, ce qui nécessitera une forme de verrou. –

+0

Vous ne pouvez certainement pas bloquer: vous n'avez aucun verrou. :-D Cela dit, je pense qu'un algorithme sans verrou pour cela serait difficile au mieux. Vous avez besoin de modifier les deux indices pour qu'ils soient atomiques ensemble, donc vous auriez vraiment besoin d'une > '. Vous devez également être capable de publier et de consommer les données de l'élément de façon atomique, ce qui est impossible à faire sans verrou dans le cas général et même pour des cas spécifiques, il est impossible de modifier les deux pointeurs et les données d'un élément . Votre algorithme a de nombreuses conditions de course. –

+0

Merci pour les commentaires les gars. Ouais, on dirait que mon intuition était bonne ... essayer de faire une version non bloquante ne fonctionnera pas très facilement. – Nik

Répondre

7
  1. Commencez par un seul producteur/multiple file d'attente des consommateurs avec des garanties de progrès appropriés.
  2. Si la file d'attente est pleine et que le push échoue, entrez une valeur. Ensuite, il y aura de l'espace pour pousser la nouvelle valeur.
+1

+1 - belle façon de penser :-). –

+0

intéressant, mais je vois une condition de concurrence potentielle dans le bit pop/push -> vous devez faire une boucle jusqu'à ce que la poussée réussisse au cas où un autre thread pousse après que vous avez sauté. Solution très intelligente quand même! –

+0

Merci, cela m'a vraiment aidé à le penser d'une manière beaucoup plus propre. Merci beaucoup à tous ceux qui ont posté! – Nik

1

Qu'est-ce que je manque ??

Lots:

  • disent que vous consommez un t alors qu'il est en cours remplacé par le producteur - vous détecter Comment allez/manipulation qui?
    • nombreuses options - par ex. do { copier la valeur sur; vérifier copie a séquence l'intégrité à l'aide de modification num etc. } while ( corrompus )
  • en utilisant des nombres atomiques ne suffit pas - vous devez également utiliser des boucles de type CAS pour affecter les incréments d'index (bien que je suppose que vous savez que, étant donné que vous dites que vous avez lu de nombreux articles sur ce déjà)
  • barrières mémoire

Mais, Écrivons que hors comme étant en dessous de votre niveau pseudo-code, et d'examiner votre question explicite:

  • Le point (5) nécessitera réellement une opération CAS. Si le readIndex a été correctement échantillonné/copié sur consume() - avant que le t (éventuellement corrompu) ait été copié - alors l'instruction CAS échouera si elle a déjà été incrémentée par le producteur. Au lieu du rééchantillonnage habituel et réessayez CAS, continuez simplement.
+0

quoi "point (5)"? –

+0

Salut Tony, excuses pour le mauvais code/pseudo code. Juste pour être sûr, n'aurais-je pas besoin d'une opération CAS pour (2) aussi? – Nik

+0

Aussi, pour la condition de sur-écriture, je crois que je peux faire disparaître ce problème si le tampon était effectivement rempli avec Atomic < T > au lieu de T lui-même. puisque l'accès serait gardé qui devrait résoudre le problème? – Nik

0

Voici un code de tampon circulaire sur les variables atomiques que j'ai récemment créées. Je l'ai modifié pour "remplacer" les données au lieu de retourner false. Avis de non-responsabilité - ce produit n'a pas encore été testé.

template<int capacity, int gap, typename T> class nonblockigcircular { 
    /* 
    * capacity - size of cicular buffer 
    * gap - minimum safety distance between head and tail to start insertion operation 
    *  generally gap should exceed number of threads attempting insertion concurrently 
    *  capacity should be gap plus desired buffer size 
    * T - is a data type for buffer to keep 
    */ 
    volatile T buf[capacity]; // buffer 

    std::atomic<int> t, h, ph, wh; 
    /* t to h data available for reading 
    * h to ph - zone where data is likely written but h is not updated yet 
    * to make sure data is written check if ph==wh 
    * ph to wh - zone where data changes in progress 
    */ 

    bool pop(T &pwk) { 
    int td, tnd; 

    do { 
     int hd=h.load()%capacity; 
     td=t.load()%capacity; 
     if(hd==td) return false; 
     tnd=(td+1)%capacity; 
    } while(!t.compare_exchange_weak(td, tnd)); 

    pwk=buf[td]; 
    return true; 
    } 


    const int count() { 
    return (h.load()+capacity-t.load()) % capacity; 
    } 

    bool push(const T &pwk) { 
    const int tt=t.load(); 
    int hd=h.load(); 

    if( capacity - (hd+capacity-tt) % capacity < gap) { 
     // Buffer is too full to insert 
     // return false; 
     // or delete last record as below 
     int nt=t.fetch_add(1); 
     if(nt==capacity-1) t.fetch_sub(capacity); 
     } 


    int nwh=wh.fetch_add(1); 
    if(nwh==capacity-1) wh.fetch_sub(capacity); 

    buf[nwh%capacity]=pwk; 

    int nph=ph.fetch_add(1); 
    if(nph==capacity-1) ph.fetch_sub(capacity); 

    if(nwh==nph) { 
     int ohd=hd; 
     while(! h.compare_exchange_weak(hd, nwh)) { 
     hd=h.load(); 
     if((ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity) break; 
     } 
    } 
    return true; 
    } 

};