2010-09-19 28 views
5

Je suis un débutant Haskell et j'ai pensé que ce serait un bon exercice. J'ai une mission où je dois lire le fichier dans un thread A, gérer les lignes de fichiers dans des threads b_i, puis afficher les résultats en fil C.Limitation de l'utilisation de la mémoire lors de la lecture de fichiers

J'ai mis en œuvre jusque-là déjà, mais l'une des exigences est que nous ne pouvons pas croire que le fichier entier s'inscrit dans la mémoire. J'espérais que paresseux IO et garbage collector le feraient pour moi, mais hélas l'utilisation de la mémoire ne cesse d'augmenter et d'augmenter.

Le thread de lecture (A) lit le fichier avec readFile qui est ensuite compressé avec des numéros de ligne et enveloppé dans Just. Ces lignes compressées sont ensuite écrites à Control.Concurrent.Chan. Chaque thread consommateur B a son propre canal. Chaque consommateur lit son propre canal lorsqu'il a des données et si la regex correspond, il est sorti sur son propre canal de sortie enveloppé dans Maybe (fait de listes).

L'imprimante vérifie le canal de sortie de chacun des threads B. Si aucun des résultats est Nothing, la ligne est imprimée. Depuis à ce point il devrait y avoir aucune référence aux anciennes lignes, je pensais que le collecteur de déchets serait en mesure de libérer ces lignes, mais hélas, je semble être dans le mauvais ici.

Le fichier est .lhs ici: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

La question est, comment puis-je limiter l'utilisation de la mémoire, ou permettre au collecteur des ordures pour enlever les lignes.

Extraits selon la demande. Si tout va bien indenter est pas trop mal détruite :)

data Global = Global {done :: MVar Bool, consumers :: Consumers} 
type Done = Bool 
type Linenum = Int 
type Line = (Linenum, Maybe String) 
type Output = MVar [Line] 
type Input = Chan Line 
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output))) 
type State a = ReaderT Global IO a 


producer :: [Input] -> FilePath -> State() 
producer c p = do 
    liftIO $ Main.log "Starting producer" 
    d <- asks done 
    f <- liftIO $ readFile p 
    mapM_ (\l -> mapM_ 
    (liftIO . flip writeChan l) c) 
    $ zip [1..] $ map Just $ lines f 
    liftIO $ modifyMVar_ d (return . not) 

printer :: State() 
printer = do 
    liftIO $ Main.log "Starting printer" 
    c <- (fmap (map (snd . snd) . M.elems) 
    (asks consumers >>= liftIO . readMVar)) 
    uniq' c 
    where head' :: Output -> IO Line 
    head' ch = fmap head (readMVar ch) 

    tail' = mapM_ (liftIO . flip modifyMVar_ 
     (return . tail)) 

    cont ch = tail' ch >> uniq' ch 

    printMsg ch = readMVar (head ch) >>= 
     liftIO . putStrLn . fromJust . snd . head 

    cempty :: [Output] -> IO Bool 
    cempty ch = fmap (any id) 
     (mapM (fmap ((==) 0 . length) . readMVar) ch) 

    {- Return false unless none are Nothing -} 
    uniq :: [Output] -> IO Bool 
    uniq ch = fmap (any id . map (isNothing . snd)) 
     (mapM (liftIO . head') ch) 

    uniq' :: [Output] -> State() 
    uniq' ch = do 
     d <- consumersDone 
     e <- liftIO $ cempty ch 
     if not e 
     then do 
      u <- liftIO $ uniq ch 
      if u then cont ch else do 
     liftIO $ printMsg ch 
     cont ch 
      else unless d $ uniq' ch 

Répondre

6

Programmation concurrente propose aucun ordre d'exécution définie sauf si vous forcez un vous-même avec MVAR et autres. Il est donc probable que le fil du producteur collera toutes/toutes les lignes du canal avant que le consommateur ne les lise et ne les transmette. Une autre architecture qui devrait répondre aux exigences est juste un thread A appelez le fichier read paresseux et coller le résultat dans un mvar. Ensuite, chaque thread consommateur prend la mvar, lit une ligne, puis remplace la mvar avant de continuer à gérer la ligne. Même alors, si le thread de sortie ne peut pas suivre, alors le nombre de lignes correspondantes stockées sur le chan là peut s'accumuler arbitrairement.

Ce que vous avez est une architecture poussée. Pour vraiment le faire fonctionner dans un espace constant, pensez en termes de demande. Trouvez un mécanisme tel que le thread de sortie signale aux threads de traitement qu'ils doivent faire quelque chose, et que les threads de traitement signalent au thread de lecture qu'ils doivent faire quelque chose. Une autre façon d'y parvenir est d'avoir des canaux de taille limitée à la place - de sorte que le thread du lecteur bloque lorsque les threads du processeur n'ont pas rattrapé, et donc les threads du processeur bloquent quand le thread de sortie n'a pas rattrapé.

Dans l'ensemble, le problème me rappelle le benchmark de Tim Bray, bien que les exigences soient quelque peu différentes. Dans tous les cas, cela a conduit à une discussion générale sur la meilleure façon d'implémenter le grep multicœur. La grande punchline était que le problème est lié à l'E/S, et vous voulez plusieurs threads de lecture sur les fichiers mmapped.

Voir ici pour plus que vous aurez toujours voulu savoir: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

+4

BoundedChan est sur hackage exactement ce type d'utilisation. –

+0

Merci Tom et sciv. Je vais essayer de l'implémenter et marquer comme une réponse si cela fonctionne – Masse