2010-11-11 21 views
5

Je veux convertir mon système de files d'attente de tâches de mon cru en une file d'attente de tâches basée sur le céleri, mais une fonctionnalité que j'ai actuellement me cause de la détresse.Comment puis-je capturer tous les enregistrements du journal python générés lors de l'exécution d'une série de tâches Celery?

À l'heure actuelle, ma file d'attente de tâches fonctionne très grossièrement; Je lance le travail (qui génère des données et le télécharge vers un autre serveur), recueille la journalisation à l'aide d'une variante de la bibliothèque de capture de journaux de Nose, puis stocke la journalisation de la tâche dans la base de données d'application.

Je voudrais le décomposer en trois tâches:

  1. recueillir des données
  2. données de téléchargement
  3. résultats du rapport (y compris toute exploitation des deux tâches précédentes)

Les vrai kicker ici est la collection de journalisation. À l'heure actuelle, en utilisant la capture de journal, j'ai une série d'enregistrements de journal pour chaque appel de journal fait pendant le processus de génération de données et de téléchargement. Ceux-ci sont requis à des fins de diagnostic. Étant donné qu'il n'est même pas garanti que les tâches s'exécutent dans le même processus, la façon dont j'accomplirai cela dans une file d'attente de tâches Celery n'est pas claire.

Ma solution idéale à ce problème sera une méthode triviale et idéalement peu invasive de capturer toute exploitation forestière au cours des tâches précédentes (1, 2) et de la rendre disponible à la tâche rapporteur (3)

Suis-je mieux de rester assez grossier avec ma définition de tâche, et de mettre tout ce travail en une seule tâche? ou existe-t-il un moyen de passer la journalisation capturée existante afin de la récupérer à la fin?

Répondre

0

On dirait qu'une sorte de 'veilleur' ​​serait idéal. Si vous pouvez regarder et consommer les journaux sous forme de flux, vous pouvez filtrer les résultats au fur et à mesure qu'ils arrivent. Comme l'observateur fonctionne séparément et n'a donc aucune dépendance vis-à-vis de ce qu'il regarde, je crois que cela répondrait à vos besoins. solution invasive.

0

Django Sentry est un utilitaire de journalisation pour Python (et Django), et prend en charge Celery.

1

Je suppose que vous utilisez le module logging. Vous pouvez utiliser un enregistreur nommé distinct par ensemble de tâches pour effectuer le travail. Ils hériteront de toute la configuration du niveau supérieur.

dans task.py:

import logging 

@task 
step1(*args, **kwargs): 
    # `key` is some unique identifier common for a piece of data in all steps of processing 
    logger = logging.getLogger("myapp.tasks.processing.%s"%key) 
    # ... 
    logger.info(...) # log something 

@task 
step2(*args, **kwargs): 
    logger = logging.getLogger("myapp.tasks.processing.%s"%key) 
    # ... 
    logger.info(...) # log something 

Ici, tous les documents ont été envoyés au même enregistreur nommé. Maintenant, vous pouvez utiliser 2 méthodes pour récupérer ces documents:

  1. Configurer écouteur de fichier avec un nom qui dépend du nom de l'enregistreur. Après la dernière étape, lisez simplement toutes les informations de ce fichier. Assurez-vous que la mise en mémoire tampon de sortie est désactivée pour cet écouteur ou que vous risquez de perdre des enregistrements.

  2. Créer un programme d'écoute personnalisé qui accumulerait des enregistrements en mémoire, puis les renvoyer tous à la demande. J'utiliserais memcached pour le stockage ici, c'est plus simple que de créer votre propre stockage inter-processus.