J'essaie de déterminer la meilleure façon d'utiliser des agents pour consommer des éléments d'une file d'attente de messages (Amazon SQS). À l'heure actuelle, j'ai une fonction (process-queue-item) qui saisit un élément de la file d'attente et le traite.Agents Clojure consommant à partir d'une file d'attente
Je veux traiter ces éléments simultanément, mais je ne peux pas comprendre comment contrôler les agents. Fondamentalement, je veux garder tous les agents occupés autant que possible sans tirer sur de nombreux éléments de la file d'attente et de développer un arriéré (je vais avoir ce fonctionnement sur quelques machines, donc les éléments doivent être laissés dans la file d'attente jusqu'à ce qu'ils sont vraiment nécessaires).
Quelqu'un peut-il me donner des indications sur l'amélioration de ma mise en œuvre?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
Y at-il une certaine façon, vous pouvez traiter le Message Queue comme un seq, puis juste utiliser pmap pour obtenir la parallélisation? –
@Alex Stoddard: Dans mon cas, process-queue-item bloque réellement sur les E/S réseau, donc je ne pense pas que pmap soit le bon choix car il utilise seulement autant de threads que la machine a de cœurs. – erikcw
@erikw: Bien sûr, mais ce n'est qu'un détail d'implémentation pmap (threads = #cores + 2). Aucune raison pour laquelle vous ne pourriez pas écrire une version de pmap avec un nombre paramétré de threads.Voir la première ligne de la source pmap: (let [n (+ 2 (.. runtime getRuntime availableProcessors)) –