2010-06-07 10 views
3

Pour une partie de mon projet, j'ai besoin d'un système de planification de processus local qui me permettra de retarder l'exécution de la méthode sur quelques secondes. J'ai des milliers de "clients" de ce système, donc en utilisant threading.Timer pour chaque retard est une mauvaise idée parce que j'atteindrai rapidement la limite de fil OS. J'ai implemented un système qui utilise un seul thread pour le contrôle de la synchronisation.Ordonnancement d'appel de méthode rapide en Python

L'idée principale est de conserver la file d'attente de tâches (time + func + args + kwargs) triées et d'utiliser threading.Timer pour planifier/annuler les exécutions de la tête de cette file d'attente. Ce système fonctionne, mais je ne suis pas satisfait de la performance. ~ 2000 clients qui planifient des tâches factices toutes les ~ 10 secondes font que le processus prend 40% du temps CPU. En regardant la sortie du profileur, je vois que tout le temps est consacré à la construction du nouveau threading.Timer, à son démarrage et en particulier à la création de nouveaux threads.

Je crois qu'il y a un meilleur moyen. Maintenant, je pense à réécrire le LightTimer de sorte qu'il y aura un thread d'exécution contrôlable par threading.Event et plusieurs threads de chronométrage qui set() l'événement. Par exemple:

  • Je programme une tâche à appeler en 10 secondes. La tâche est ajoutée à une file d'attente. Le fil de synchronisation n ° 1 commence time.sleep(10) avant event.set()
  • Ensuite, je programme une tâche à appeler en 11 secondes. La tâche est ajoutée à la file d'attente. Rien ne se passe avec le fil de synchronisation, il remarquera une nouvelle tâche après le réveil.
  • Ensuite, je programme une tâche à appeler en 5 secondes. La tâche est ajoutée à la file d'attente. Le fil de synchronisation n ° 2 commence time.sleep(5) parce que # 1 dort déjà pendant un intervalle plus long.

J'espère que vous avez compris l'idée. Que penses-tu de cette façon? Y a-t-il un meilleur moyen? Peut-être que je peux utiliser certaines fonctionnalités du système Linux pour faire une solution optimale?

Répondre

2

Une autre implémentation que vous pouvez utiliser consiste à utiliser la méthode time.time() pour calculer l'heure absolue à laquelle chaque fonction en file d'attente doit être exécutée. Placez cette heure et votre fonction à appeler dans un wrapper d'objet qui remplace l'opérateur de comparaison en utilisant le temps d'exécution pour déterminer l'ordre. Ensuite, utilisez le module heapq pour maintenir un min-heap. Cela vous fournira une infrastructure de données efficace où l'élément 0 du tas est toujours votre prochain événement. Une façon d'implémenter les appels réels serait d'utiliser un thread séparé pour exécuter les rappels. Le tas devra être protégé avec un mutex et vous pouvez utiliser une variable de condition pour implémenter la planification. Dans une boucle infinie, effectuez une recherche la prochaine fois pour exécuter une fonction (élément 0 du tas) et utilisez la méthode wait() de la variable de condition avec le délai d'attente défini pour l'heure d'exécution suivante. Votre méthode d'insertion de segment peut ensuite utiliser la méthode notify() de la variable de condition pour réactiver le thread de planification tôt si la fonction nouvellement insérée doit se produire avant la plus ancienne déjà dans le tas.

+0

Hm ... très intéressant, merci! Surtout que je n'ai jamais regardé 'heapq' et je n'ai pas pensé à utiliser' threading.Condition' dans ce but. – nkrkv

2

Avez-vous regardé le module sched dans la bibliothèque standard Python? Exécuter le planificateur sur un thread dédié (et avoir toutes les actions planifiées être "mettre une méthode liée et ses args sur une file d'attente" à partir de quels threads dans un pool peler et l'exécuter - comme je l'ai écrit dans le chapitre Nutshell sur les threads, sauf que dans ce cas il n'y avait pas d'ordonnancement) devrait faire ce que vous voulez.

+0

Et comment gérer une instruction de docs * "la classe de l'ordonnanceur a des limitations en ce qui concerne la sécurité des threads, l'impossibilité d'insérer une nouvelle tâche avant celle actuellement en attente dans un planificateur en cours d'exécution" *? Cela signifie-t-il que I * peut * insérer une nouvelle tâche avant celle qui est actuellement en attente mais à partir du même thread ou dois-je générer plusieurs threads en cours d'exécution sched? – nkrkv

+0

@nailxx, cela nécessite un certain soin - cet avertissement concerne les utilisations "naïves", telles que l'utilisation du "normal" delayfunc (juste time.sleep) ou l'exécution de la méthode d'un planificateur à partir de plusieurs threads. Avec un delayfunc qui fait un Queue.get avec timeout sur une file d'attente dédiée (donc se réveille dès que quelque chose est poussé dans la file d'attente ... et le tire et effectue des appels de méthode de programmation si nécessaire) - et pousse d'autres threads cette file d'attente dédiée à la place des méthodes du planificateur appelant - J'ai utilisé avec succès une seule instance de planificateur dans un scénario multithread similaire. –

0

Il est peu probable que vous atteigniez la limite de thread OS avec "quelques milliers de clients"; vous pouvez consommer beaucoup de mémoire inutile avec les piles pour tous ces fils.Jetez un coup d'oeil à ce que fait le twisted, cela permet à un processus de multiplexer de nombreux événements (y compris les timers) d'une manière qui s'est avérée très bien fonctionner avec un grand nombre d'événements. Vous pouvez également combiner des modèles pilotés par événement et multiprocessus, en exécutant plusieurs processus par machine et en exécutant une logique événementielle dans chacun d'eux: un processus peut gérer 2 000 clients, vous pouvez toujours exécuter des processus 30x (à condition est suffisant ressource globale) et gagner un meilleur débit, en particulier sur le matériel multi-core moderne.

+0

* Il est peu probable que vous atteigniez la limite de thread OS * - Je l'ai réellement fait :) – nkrkv

+0

Je m'attends à ce que vous ayez épuisé l'espace d'adressage avec toutes ces piles. La taille de la pile par défaut de Linux est généralement de 1M (ou similaire), il suffit donc de quelques milliers de threads pour utiliser A/S dans un processus 32 bits. Le système d'exploitation a une limite beaucoup plus élevée. – MarkR