2010-04-08 7 views
26

J'essaie de déterminer la meilleure façon d'utiliser des agents pour consommer des éléments d'une file d'attente de messages (Amazon SQS). À l'heure actuelle, j'ai une fonction (process-queue-item) qui saisit un élément de la file d'attente et le traite.Agents Clojure consommant à partir d'une file d'attente

Je veux traiter ces éléments simultanément, mais je ne peux pas comprendre comment contrôler les agents. Fondamentalement, je veux garder tous les agents occupés autant que possible sans tirer sur de nombreux éléments de la file d'attente et de développer un arriéré (je vais avoir ce fonctionnement sur quelques machines, donc les éléments doivent être laissés dans la file d'attente jusqu'à ce qu'ils sont vraiment nécessaires).

Quelqu'un peut-il me donner des indications sur l'amélioration de ma mise en œuvre?

(def active-agents (ref 0)) 

(defn process-queue-item [_] 
    (dosync (alter active-agents inc)) 
    ;retrieve item from Message Queue (Amazon SQS) and process 
    (dosync (alter active-agents dec))) 

(defn -main [] 
    (def agents (for [x (range 20)] (agent x))) 

    (loop [loop-count 0] 

    (if (< @active-agents 20) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent)) 
     ;should skip this agent until later if it is still busy processing (not sure how) 
     (send-off agent process-queue-item))) 

    ;(apply await-for (* 10 1000) agents) 
    (Thread/sleep 10000) 
    (logging/info (str "ACTIVE AGENTS " @active-agents)) 
    (if (> 10 loop-count) 
     (do (logging/info (str "done, let's cleanup " count)) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent))) 
     (apply await agents) 
     (shutdown-agents)) 
     (recur (inc count))))) 
+0

Y at-il une certaine façon, vous pouvez traiter le Message Queue comme un seq, puis juste utiliser pmap pour obtenir la parallélisation? –

+0

@Alex Stoddard: Dans mon cas, process-queue-item bloque réellement sur les E/S réseau, donc je ne pense pas que pmap soit le bon choix car il utilise seulement autant de threads que la machine a de cœurs. – erikcw

+0

@erikw: Bien sûr, mais ce n'est qu'un détail d'implémentation pmap (threads = #cores + 2). Aucune raison pour laquelle vous ne pourriez pas écrire une version de pmap avec un nombre paramétré de threads.Voir la première ligne de la source pmap: (let [n (+ 2 (.. runtime getRuntime availableProcessors)) –

Répondre

6

Ce que vous demandez est un moyen de continuer à distribuer des tâches, mais avec une limite supérieure. Une approche simple consiste à utiliser un sémaphore pour coordonner la limite. Voici comment j'aborder:

(let [limit (.availableProcessors (Runtime/getRuntime)) 
     ; note: you might choose limit 20 based upon your problem description 
     sem (java.util.concurrent.Semaphore. limit)] 
    (defn submit-future-call 
    "Takes a function of no args and yields a future object that will 
    invoke the function in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [#^Callable task] 
    ; take a slot (or block until a slot is free) 
    (.acquire sem) 
    (try 
     ; create a future that will free a slot on completion 
     (future (try (task) (finally (.release sem)))) 
     (catch java.util.concurrent.RejectedExecutionException e 
     ; no task was actually submitted 
     (.release sem) 
     (throw e))))) 

(defmacro submit-future 
    "Takes a body of expressions and yields a future object that will 
    invoke the body in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [& body] `(submit-future-call (fn [] [email protected]))) 

#_(example 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    ;; blocks at this point for a 2 processor PC until the previous 
    ;; two futures complete 
    #<[email protected]: :pending> 
    ;; then submits the job 

Avec cela en place maintenant, vous avez juste besoin de coordonner la façon dont les tâches elles-mêmes sont prises. On dirait que vous avez déjà les mécanismes en place pour le faire. Boucle (submit-future (process-queue-item))

4

Peut-être pourriez-vous utiliser la fonction seque? (doc seque) citant:

clojure.core/seque 
([s] [n-or-q s]) 
    Creates a queued seq on another (presumably lazy) seq s. The queued 
    seq will produce a concrete seq in the background, and can get up to 
    n items ahead of the consumer. n-or-q can be an integer n buffer 
    size, or an instance of java.util.concurrent BlockingQueue. Note 
    that reading from a seque can block if the reader gets ahead of the 
    producer. 

Ce que j'ai à l'esprit est une séquence paresseux obtenir des éléments de file d'attente sur le réseau; vous envelopper dans seque, mettre cela dans un Ref et avoir des agents de travail consomment des éléments hors de ce seque. seque renvoie quelque chose qui ressemble à un seq normal du point de vue de votre code, la magie de la file d'attente se produisant de manière transparente. Notez que si la séquence que vous mettez à l'intérieur est fragmentée, elle sera toujours forcée à un morceau à la fois. Notez également que l'appel initial à seque lui-même semble bloquer jusqu'à ce qu'un ou deux éléments initiaux soient obtenus (ou un fragment, selon le cas, je pense que cela a plus à voir avec les séquences paresseuses que le seque lui-même).

Un croquis du code (un vraiment d'un peu précis, pas testé du tout):

(defn get-queue-items-seq [] 
    (lazy-seq 
    (cons (get-queue-item) 
     (get-queue-items-seq)))) 

(def task-source (ref (seque (get-queue-items-seq)))) 

(defn do-stuff [] 
    (let [worker (agent nil)] 
    (if-let [result 
      (dosync 
       (when-let [task (first @task-source)] 
       (send worker (fn [_] (do-stuff-with task)))))] 
     (do (await worker) 
      ;; maybe do something with worker's state 
      (do-stuff))))) ;; continue working 

(defn do-lots-of-stuff [] 
    (let [fs (doall (repeatedly 20 #(future (do-stuff))))] 
    fs))) 

En fait, vous aurait probablement besoin d'un producteur plus complexe de l'élément de file d'attente suivants afin que vous puissiez demander pour arrêter de produire de nouveaux articles (une nécessité si l'ensemble doit pouvoir être fermé gracieusement, les futurs mourront quand la source de tâche est sèche, utilisez future-done? pour voir s'ils l'ont déjà fait). Et c'est juste quelque chose que je peux voir à première vue ... Je suis sûr qu'il y a plus de choses à polir ici. Je pense que l'approche générale fonctionnerait, cependant.

+0

J'ai ajouté une correction à l'avant-dernière ligne de l'esquisse de code dans laquelle les contrats à terme seront effectivement créés. (Type de crucial à l'idée, vraiment ... :-)) –

+0

J'essaie de comprendre ce code. Pourquoi la tâche-source est-elle une ref? Vous ne semblez pas le modifier à tout moment. –

+0

@Siddhartha Reddy: À première vue, je dirais que c'est la raison pour laquelle j'ai appelé le code "* really * sketchy". ;-) Je suppose que cela aurait besoin d'un '(alter task-source rest)' (ou 'next') dans le' when-let' à l'intérieur de 'dosync' pour être utile. En fait, en pensant à cela encore, je me demande si l'utilisation de 'seque' est une si bonne idée après tout; il me semble maintenant qu'il augmente le nombre d'éléments de la file d'attente qui seraient perdus en cas de crash de la machine locale (puisque 'seque' tire les éléments avant qu'ils ne soient demandés par les travailleurs). Là encore, dans certains scénarios, il pourrait être bon en termes de performances; c'est –

23
(let [switch (atom true) ; a switch to stop workers 
     workers (doall 
       (repeatedly 20 ; 20 workers pulling and processing items from SQS 
        #(future (while @switch 
          (retrieve item from Amazon SQS and process)))))] 
    (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-) 
    (reset! switch false) ; stop ! 
    (doseq [worker workers] @worker)) ; waiting for all workers to be done 
+2

Cela ne fonctionne plus avec la version 1.4 ('future' et' future-call' ne renvoient pas 'IFn', ce que' repeat' requiert). Vous pouvez facilement envelopper un futur dans une fonction, en ajoutant précédement '' (future'' avec '#', cependant.) –

+3

@AlexB bonne prise, ce n'est même pas un problème 1.4: le # aurait dû être là. Merci! – cgrand

0

Je ne sais pas comment ce idiomatiques est, comme je suis un débutant avec la langue, mais la solution suivante fonctionne pour moi:

(let [number-of-messages-per-time 2 
     await-timeout 1000] 
    (doseq [p-messages (partition number-of-messages-per-time messages)] 
    (let [agents (map agent p-messages)] 
     (doseq [a agents] (send-off a process)) 
     (apply await-for await-timeout agents) 
     (map deref agents))))