2010-07-05 13 views
2

Je reçois à un rythme extrêmement rapide, les tweets d'une connexion de longue durée au serveur de diffusion en streaming de l'API Twitter. Je procède en faisant un peu de traitement de texte lourd et enregistre les tweets dans ma base de données.Traitement des données de diffusion en grand volume avec Twisted ou en utilisant des threads, file d'attente en Python

J'utilise PyCurl pour la fonction de connexion et de rappel qui s'occupe du traitement de texte et de l'enregistrement dans la base de données. Voir ci-dessous mon approche qui ne fonctionne pas correctement.

Je ne suis pas familier avec la programmation réseau, je voudrais donc savoir: Comment utiliser Threads, Queue ou Twisted pour résoudre ce problème?

def process_tweet(): 
    # do some heaving text processing 


def open_stream_connection(): 
    connect = pycurl.Curl() 
    connect.setopt(pycurl.URL, STREAMURL) 
    connect.setopt(pycurl.WRITEFUNCTION, process_tweet) 
    connect.setopt(pycurl.USERPWD, "%s:%s" % (TWITTER_USER, TWITTER_PASS)) 
    connect.perform() 
+1

Définir 'extrêmement rapide' dans les messages par seconde, et élaborer sur' traitement lourd'. – MattH

+0

également définir "ne fonctionne pas correctement" – nosklo

+0

Upvoted pour le nom d'utilisateur. Désolé, ne peut pas l'aider: P –

Répondre

0

Voici une configuration simple si vous êtes d'accord pour utiliser une seule machine.

1 connexion accepte les connexions. Une fois la connexion acceptée, la connexion acceptée est transmise à un autre thread pour traitement.

Vous pouvez, bien sûr, utiliser des processus (par exemple, en utilisant multiprocessing) au lieu de threads, mais je ne connais pas multiprocessing pour donner des conseils. La configuration serait la même: 1 processus accepte les connexions, puis les passe aux sous-processus. Si vous devez répartir le traitement sur plusieurs machines, la solution consiste simplement à insérer le message dans la base de données, puis à notifier les opérateurs au sujet du nouvel enregistrement (ceci nécessitera une sorte de coordination/verrouillage entre les travailleurs). Si vous voulez éviter de toucher la base de données, vous devrez rediriger les messages de votre processus réseau vers les travailleurs (et je ne connais pas assez bien les réseaux bas niveau pour vous dire comment faire :))

0

Je propose cette organisation:

  • un processus lit Twitter, tweets enfourne dans la base
  • un ou plusieurs processus lit base de données, traite chaque, insère dans la nouvelle base de données. Les tweets originaux ont été supprimés ou marqués comme étant traités.

Autrement dit, vous avez deux autres processus/threads. La base de données de tweet pourrait être vue comme une file d'attente de travail. Les processus de travail multiples suppriment les tâches (tweets) de la file d'attente et créent des données dans la seconde base de données.

+2

Une base de données semble être exagérée comme un réceptacle temporaire. – Oddthinking

+0

D'accord. Probablement préférable d'utiliser l'architecture MT Queue proposée par @Oddthinking – Sid

1

Vous devriez avoir un certain nombre de threads recevant les messages quand ils arrivent. Ce nombre devrait probablement être 1 si vous utilisez pycurl, mais devrait être plus élevé si vous utilisez httplib - l'idée étant que vous voulez pouvoir avoir plus que une requête sur l'API Twitter à la fois, donc il y a une quantité constante de travail à traiter.

Lorsque chaque Tweet arrive, il est placé dans une file d'attente. La file d'attente garantit la sécurité des threads dans les communications - chaque tweet ne sera traité que par un thread de travail.

Un pool de threads de travail est responsable de la lecture de la file d'attente et du traitement du Tweet. Seuls les tweets intéressants doivent être ajoutés à la base de données. Comme la base de données est probablement le goulot d'étranglement, il y a une limite au nombre de threads dans le pool qui valent la peine d'être ajoutés - plus de threads ne le feront pas traiter plus vite, cela signifiera simplement que plus de threads attendent dans le pool. file d'attente pour accéder à la base de données.

Ceci est un idiome Python assez commun.Cette architecture ne peut évoluer que dans une certaine mesure, c'est-à-dire ce qu'une machine peut traiter.