2010-08-16 11 views
16

Je suis un débutant dans Hadoop. J'essaie le programme Wordcount.MultipleOutputFormat dans hadoop

Maintenant, pour essayer plusieurs fichiers de sortie, j'utilise MultipleOutputFormat. ce lien m'a aidé à le faire. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

dans ma classe de pilote que j'avais

MultipleOutputs.addNamedOutput(conf, "even", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class); 

    MultipleOutputs.addNamedOutput(conf, "odd", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class);` 

et ma réduire classe est devenu cette

public static class Reduce extends MapReduceBase implements 
     Reducer<Text, IntWritable, Text, IntWritable> { 
    MultipleOutputs mos = null; 

    public void configure(JobConf job) { 
     mos = new MultipleOutputs(job); 
    } 

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     if (sum % 2 == 0) { 
      mos.getCollector("even", reporter).collect(key, new IntWritable(sum)); 
     }else { 
      mos.getCollector("odd", reporter).collect(key, new IntWritable(sum)); 
     } 
     //output.collect(key, new IntWritable(sum)); 
    } 
    @Override 
    public void close() throws IOException { 
     // TODO Auto-generated method stub 
    mos.close(); 
    } 
} 

choses ont fonctionné, mais je reçois beaucoup de fichiers, (un étrange et un même pour chaque carte -reduce)

Question est: Comment puis-je avoir seulement 2 fichiers de sortie (impair & même) de sorte que chaque sortie impaire de chaque carte-réduire soit écrit dans cet impair fichier, et même pour pair. Chaque réducteur utilise un OutputFormat pour écrire des enregistrements dans

+5

Vous utilisez MultipleOutputs pas MultipleOutputFormat. Les deux sont des bibliothèques différentes. –

Répondre

3

C'est pourquoi vous obtenez un ensemble de fichiers impairs et pairs par réducteur. C'est par conception que chaque réducteur peut effectuer des écritures en parallèle.

Si vous ne voulez qu'un seul fichier impaire et un seul fichier pair, vous devez définir mapred.reduce.tasks sur 1. Mais les performances en souffriront, car tous les mappeurs seront alimentés dans un seul réducteur.

Une autre option consiste à modifier le processus de lecture de ces fichiers pour accepter plusieurs fichiers d'entrée ou d'écrire un processus séparé qui fusionne ces fichiers ensemble.

+3

insttead de changer les tâches rouges de la carte, j'ai outrepassé la fonction getFilenameForKeyValue() .. et cela a fonctionné ..... merci. – raj

1

Plusieurs fichiers de sortie seront générés en fonction du nombre de réducteurs.

Vous pouvez utiliser DSF Hadoop -getmerge aux sorties fusionnées

+0

merci :) mais je dois le faire par carte réduire seulement, – raj

3

j'ai écrit une classe pour le faire. utiliser Il suffit de votre travail:

job.setOutputFormatClass(m_customOutputFormatClass); 

C'est ma classe:

import java.io.IOException; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

/** 
* TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br> 
* <p> 
* <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an 
* {@link RecordWriter} instance per folder name. 
* </p> 
* <p> 
* In this class the folder name is defined by the written entry's key.<br> 
* To change this behavior simply extend this class and override the 
* {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own 
* {@link FolderNameExtractor} implementation. 
* </p> 
* 
* 
* @author ykesten 
* 
* @param <K> - Keys type 
* @param <V> - Values type 
*/ 
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> { 

    private String folderName; 

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> { 

     private Map<String, RecordWriter<K, V>> fileNameToWriter; 
     private FolderNameExtractor<K, V> fileNameExtractor; 
     private TaskAttemptContext job; 

     public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) { 
      fileNameToWriter = new HashMap<String, RecordWriter<K, V>>(); 
      this.fileNameExtractor = fileNameExtractor; 
      this.job = job; 
     } 

     @Override 
     public void write(K key, V value) throws IOException, InterruptedException { 
      String fileName = fileNameExtractor.extractFolderName(key, value); 
      RecordWriter<K, V> writer = fileNameToWriter.get(fileName); 
      if (writer == null) { 
       writer = createNewWriter(fileName, fileNameToWriter, job); 
       if (writer == null) { 
        throw new IOException("Unable to create writer for path: " + fileName); 
       } 
      } 
      writer.write(key, value); 
     } 

     @Override 
     public void close(TaskAttemptContext context) throws IOException, InterruptedException { 
      for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) { 
       entry.getValue().close(context); 
      } 
     } 

    } 

    private synchronized RecordWriter<K, V> createNewWriter(String folderName, 
      Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) { 
     try { 
      this.folderName = folderName; 
      RecordWriter<K, V> writer = super.getRecordWriter(job); 
      this.folderName = null; 
      fileNameToWriter.put(folderName, writer); 
      return writer; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 

    @Override 
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { 
     Path path = super.getDefaultWorkFile(context, extension); 
     if (folderName != null) { 
      String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); 
      path = new Path(newPath); 
     } 
     return path; 
    } 

    @Override 
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { 
     return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); 
    } 

    public FolderNameExtractor<K, V> getFolderNameExtractor() { 
     return new KeyFolderNameExtractor<K, V>(); 
    } 

    public interface FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value); 
    } 

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value) { 
      return key.toString(); 
     } 
    } 

}