2010-08-14 18 views
4

J'essaie de trouver la somme de tous les points en utilisant hadoop, Le problème que je rencontre est d'obtenir toutes les valeurs d'une clé donnée dans un seul réducteur. Ça ressemble à ça.Manipulation de l'itérateur dans mapreduce

Réducteur:

public static class Reduce extends MapReduceBase implements 
     Reducer<Text, IntWritable, Text, DoubleWritable> { 

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, DoubleWritable> output, Reporter reporter) 
      throws IOException { 
     Text word = new Text(); 

     Iterator<IntWritable> tr = values; 
     IntWritable v; 
     while (tr.hasNext()) { 
      v = tr.next(); 

      Iterator<IntWritable> td = values; 
      while (td.hasNext()) { 

       IntWritable u = td.next(); 
       double sum = u+v; 
       word.set(u + " + " + v); 
       output.collect(word, new DoubleWritable(sum)); 
      } 
     } 
    } 
} 

Et je suis en train de créer deux copies de la variable Iterator afin que je puisse passer par toutes les valeurs du deuxième iterator alors que je reçois une valeur unique de la précédente iterator (Deux alors que les boucles ci-dessus) mais les deux itérateurs conservent la même valeur tout le temps.

Je ne sais pas si c'est la bonne façon de le faire, Toute aide est vraiment appréciée.

Merci,

Tsegay

+0

J'essaie également de trouver un problème similaire. J'ai besoin de passer en revue les enregistrements deux fois dans la fonction de réduction. J'utilise hadoop streaming avec python et ne pas comment rembobiner l'itérateur pour les enregistrements dans le réducteur. –

Répondre

12

Je ne sais pas exactement ce que vous essayez d'accomplir, mais je sais que ce bien: le comportement des itérateurs de Hadoop est un peu étrange. L'appel de Iterator.next() retournera toujours l'instance SAME EXACT de IntWritable, avec le contenu de cette instance remplacé par la valeur suivante. Donc, tenir une référence à IntWritable à travers les appels à Iterator.next() est presque toujours une erreur. Je crois que ce comportement est conçu pour réduire la quantité de création d'objet et les frais généraux du GC. Une façon de contourner ce problème est d'utiliser WritableUtils.clone() pour cloner l'instance que vous essayez de conserver entre les appels à Iterator.next().

+0

Oui, cela m'est arrivé aujourd'hui. Vive les itérateurs Hadoop! Ce comportement est-il documenté officiellement (sauf pour les blogs et ici)? –

1

En passant par your previous question,, vous semblez être bloqué sur the iterator problem piccolbo described. La formulation de votre réducteur indique également que vous avez oublié ses algorithmes proposés pour l'approche naïve ... qui fonctionnera, mais de façon sous-optimale.

Permettez-moi de nettoyer votre code un peu avec ma réponse:

// Making use of Hadoop's Iterable reduce, assuming it's available to you 
// 
// The method signature is: 
// 
// protected void reduce(KEYIN key, java.lang.Iterable<VALUEIN> values, 
// org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context 
// context) throws java.io.IOException, java.lang.InterruptedException 
// 
public void reduce(Text key, Iterable<IntWritable> values, Context context) 
     throws IOException, InterruptedException { 

    // I assume you declare this here to save on GC 
    Text outKey = new Text(); 
    IntWritable outVal = new IntWritable(); 

    // Since you've forgone piccolbo's approach, you'll need to maintain the 
    // data structure yourself. Since we always walk the list forward and 
    // wish to optimize the insertion speed, we use LinkedList. Calls to 
    // IntWritable.get() will give us an int, which we then copy into our list. 
    LinkedList<Integer> valueList = new LinkedList<Integer>(); 

    // Here's why we changed the method signature: use of Java's for-each 
    for (IntWritable iw: values) { 
     valueList.add(iw.get()); 
    } 

    // And from here, we construct each value pair as an O(n^2) operation 
    for (Integer i: valueList) { 
     for (Integer j: valueList) { 
      outKey.set(i + " + " + j); 
      outVal.set(i + j); 
      context.write(outKey, outVal); 
     } 
    } 

    // Do note: I've also changed your return value from DoubleWritable to 
    // IntWritable, since you should always be performing integer operations 
    // as defined. If your points are Double, supply DoubleWritable instead. 
} 

Cela fonctionne, mais il fait plusieurs hypothèses qui limitent les performances lors de la construction de votre matrice de distance, notamment en exigeant la combinaison à effectuer dans un opération de réduction unique. Envisagez piccolbo's approach si vous connaissez à l'avance la taille et la dimensionnalité de votre jeu de données d'entrée. Cela devrait être disponible, dans le pire des cas, en parcourant les lignes d'entrée en temps linéaire.

(Voir this thread pourquoi nous ne pouvons pas mettre en œuvre cela comme un iterator avant.)

28

Les itérateurs du réducteur ne sont pas aussi simple que vous pourriez penser.

Le problème est que le nombre total d'éléments que vous itérez peut ne pas tenir dans la mémoire. Cela signifie que l'itérateur peut lire sur le disque. Si vous avez deux copies indépendantes de l'itérateur, vous pouvez avoir l'une d'entre elles très en avance sur l'autre, ce qui implique que les données entre les deux itérateurs ne peuvent pas être supprimées. Pour simplifier l'implémentation, Hadoop ne prend pas en charge plusieurs itérateurs pour les valeurs réduites.

L'impact pratique de ceci est que vous ne pouvez pas passer deux fois par le même itérateur. Ce n'est pas bien, mais c'est le cas. Si vous savez pertinemment que le nombre d'éléments correspondra à la mémoire, vous pouvez copier tous les éléments dans une liste comme suggéré par MrGomez.Si vous ne le savez pas, vous devrez peut-être utiliser un stockage secondaire. La meilleure approche consiste à redéfinir votre programme de sorte que vous n'ayez pas besoin de stockage illimité dans le réducteur. Cela peut être un peu difficile, mais il existe des approches standard du problème.

Pour votre problème particulier, vous avez une croissance quadratique de la taille de sortie par rapport au plus grand ensemble d'entrée de réduction. C'est généralement une très mauvaise idée. Dans la plupart des cas, vous n'avez pas besoin de TOUTES les paires, seulement les paires les plus importantes. Si vous pouvez découper l'ensemble des paires d'une manière ou d'une autre, alors vous êtes tous ensemble et vous pourrez peut-être supprimer la contrainte toutes les paires. Par exemple, si vous essayez de trouver les 100 paires avec la plus grande somme pour chaque ensemble de réduction, vous pouvez conserver une file d'attente prioritaire avec les 100 plus grandes entrées vues jusqu'à maintenant et une file d'attente prioritaire avec les 100 plus grandes sommes vues loin. Pour chaque nouvelle entrée, vous pouvez former la somme avec les 100 plus grands nombres vus jusqu'à présent et essayer de coller ces sommes dans la deuxième file d'attente. Enfin, vous devez coller la nouvelle entrée dans la première file d'attente et ajuster les deux files d'attente à 100 éléments en supprimant les plus petites valeurs (si nécessaire). Dans la méthode close de la réduction, vous devez vider la file d'attente prioritaire. Cette approche garantit que vous n'avez besoin que de min (n^2, 200) éléments de stockage évitant le problème n^2 et évitant le double passage à travers l'entrée en gardant les 100 plus gros objets vus plutôt que tous les objets vus.

1

Pour copier l'itérateur, vous ne pouvez pas affecter l'itérateur à une nouvelle variable. Vous devriez "cloner" un itérateur à une nouvelle variable de la classe d'itérateur. Lorsque l'itérateur A affecte une autre variable d'itérateur B, les deux variables de l'itérateur pointent les mêmes données.