De Don Syme blog (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx) J'ai essayé d'implémenter un écouteur de flux Twitter. Mon but est de suivre les directives de la documentation de l'API de Twitter qui dit "que les tweets doivent souvent être sauvegardés ou mis en file d'attente avant d'être traités lors de la construction d'un système à haute fiabilité".Twitter flux api avec des agents en F #
Donc mon code doit avoir deux composantes:
- Une file d'attente qui empile et traite chaque JSON
- statut/Tweet Quelque chose à lire le flux Twitter qui déverse à la file d'attente le tweet dans les chaînes de JSON
Je choisis les éléments suivants:
- Un agent auquel je posterai chaque tweet , Qui décode le JSON, et décharges à la base de données
- Un simple http WebRequest
Je voudrais également jeter dans un fichier texte d'erreur, d'insérer dans la base de données. (Je vais probablement passer à un agent superviseur pour toutes les erreurs).
Deux problèmes:
- est ma stratégie ici tout bon? Si je comprends bien, l'agent se comporte comme une file d'attente intelligente et traite ses messages de manière asynchrone (s'il a 10 gars dans sa file d'attente, il en traitera un certain nombre au lieu d'attendre que le 1 er termine, etc. ...), correct ?
- Selon la publication de Don Syme, tout ce qui est avant est isolé, StreamWriter et le vidage de la base de données sont alors isolés. Mais parce que j'en ai besoin, je ne ferme jamais ma connexion à la base de données ...?
Le code ressemble à quelque chose comme:
let dumpToDatabase databaseName =
//opens databse connection
fun tweet -> inserts tweet in database
type Agent<'T> = MailboxProcessor<'T>
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
async{
use w2 = new StreamWriter(@"\Errors.txt")
let dumpError =fun (error:string) -> w2.WriteLine(error)
let dumpTweet = dumpToDatabase "stream"
while true do
let! msg = inbox.Receive()
try
let tw = decode msg
dumpTweet tw
with
| :? MySql.Data.MySqlClient.MySqlException as ex ->
dumpError (msg+ex.ToString())
| _ as ex ->()
}
)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
Merci beaucoup!
Modifier code avec l'agent de traitement:
let dumpToDatabase (tweets:tweet list)=
bulk insert of tweets in database
let agentProcessor =
Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
async{
while true do
let! msg = inbox.Receive()
try
msg
|> List.map(decode)
|> dumpToDatabase
with
| _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
}
)
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
let rec loop messageList count = async{
try
let! newMsg = inbox.Receive()
let newMsgList = newMsg::messageList
if count = 10 then
agentProcessor.Post(newMsgList)
return! loop [] 0
else
return! loop newMsgList (count+1)
with
| _ as ex -> Console.WriteLine("Dump "+ex.ToString())
}
loop [] 0)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
merci pour votre réponse tomas. Quand le travail est terminé, j'essaierai d'écrire le traitement en bloc, cela me semblera amusant (cela nécessitera probablement beaucoup plus d'études ...)! – jlezard
a finalement eu le temps d'essayer d'ajouter un autre agent, semble bien fonctionner. Je suis un peu troublé par votre dernier point: à moins que je me trompe Recevoir tue l'agent après le temps spécifié, donc je ne vois pas comment l'utiliser? Merci! – jlezard
@jlezard: Je pense que l'exception vous avertit qu'aucun message n'a été reçu, mais qu'il maintient l'agent dans un état utilisable. Cependant, c'est une meilleure idée d'utiliser 'TryReceive' qui renvoie' None' quand il expire (je ne me suis pas rendu compte qu'il existe quand j'ai écrit la réponse!) –