2010-09-21 20 views
12

J'ai quelque chose comme une file d'attente de travaux sur RabbitMQ et, sur une demande d'annulation d'un travail, je voudrais retirer les tâches qui n'ont pas encore commencé le traitement (leurs messages n'ont pas été reçus), ce qui correspond pour retirer ces messages des files d'attente auxquelles ils ont été routés.Comment faire pour rétracter un message dans RabbitMQ?

Je n'ai pas trouvé cette fonctionnalité dans AMQP ou dans l'API RabbitMQ; peut-être n'ai-je pas cherché assez bien? Ou devrais-je utiliser une solution de contournement (ce n'est pas difficile, mais quand même)?

Répondre

2

Au moins deux façons d'atteindre votre cible:

  • basic.reject sera requeue message si requeue=true est réglé (sinon il refusera un message). (Supporté depuis RabbitMQ 2.0.0, voir http://www.rabbitmq.com/blog/2010/08/03/well-ill-let-you-go-basicreject-in-rabbitmq/).

  • basic.recover demandera au courtier de redistribuer les messages non acquittés sur le canal. RabbitMQ ne vous permet pas de modifier ou de supprimer des messages après qu'ils ont été mis en file d'attente.

+2

Non, cela résout un problème différent. Je n'ai pas besoin de rejeter un message du côté des consommateurs, je veux annuler sa livraison du côté du producteur, afin que le message ne parvienne pas au consommateur comme s'il n'avait jamais existé. Dans mon problème, un consommateur ne peut pas décider si un message doit être rejeté. – jkff

+4

Si des messages ont été publiés avec succès et que vous devez les supprimer de la file d'attente connue, abonnez-vous et consommez-les au producteur. –

4

Pour cela, vous voulez un type de base de données pour contenir l'état de chaque travail, et d'utiliser RabbitMQ pour informer les parties intéressées des changements dans cet état.

Pour des volumes plus faibles, vous pouvez le combiner avec une file d'attente par tâche. Créez la file d'attente, postez la description du travail dans la file d'attente, annoncez le nom de la file aux travailleurs. Si le travail doit être annulé avant d'être traité, supprimez la file d'attente du travail; Quand les travailleurs viennent chercher la description de travail, ils remarquent que la file d'attente a disparu. Plus léger et généralement meilleur serait d'utiliser redis ou un autre magasin de clé/valeur pour conserver l'état du travail (avec un enregistrement supprimé ou absent signifiant un travail annulé ou inexistant) et d'utiliser rabbitmq pour informer sur nouveau/supprimé/modifié enregistrements dans le magasin de clés/valeurs.

1

Vous devez vous abonner à toutes les files d'attente vers lesquelles les messages ont été acheminés et les consommer avec ack. Par exemple, si vous publiez un échange de sujet avec "test" comme clé de routage, et qu'il y a 3 files d'attente persistantes qui s'abonnent à "test", vous devrez consommer ces trois files d'attente. Il serait peut-être préférable d'ajouter une autre file d'attente que vos processus consommateurs écouteraient aussi, et dites-leur d'ignorer ces messages. Une alternative, puisque vous utilisez RabbitMQ, est d'écrire un plugin d'échange personnalisé qui acceptera certaines instructions hors bande pour effacer toutes les files d'attente. Par exemple vous pourriez avoir cet échange lire un en-tête spécial de message qui lui indique pour effacer toutes les files d'attente auxquelles ce message est destiné. Cela nécessite d'écrire du code Erlang, mais il y a 4 types d'échanges différents implémentés, vous n'avez donc besoin que de copier le code le plus similaire et d'écrire le code pour les nouveaux comportements. Si vous n'utilisez que des en-têtes personnalisés, le corps du message peut être un message normal pour les consommateurs.

Pour résumer:

1) l'éditeur a besoin de consommer les messages se 2) l'éditeur peut envoyer un message spécial dans une file d'attente spéciale pour dire aux consommateurs d'ignorer le message 3) l'éditeur peut envoyer un message spécial à un échange personnalisé qui effacera tous les messages existants des files d'attente avant d'envoyer ce message spécial aux consommateurs.

7

Je résoudrais ce scénario en demandant au travailleur de vérifier une sorte de source de données faisant autorité pour déterminer si le travail devrait ou non se poursuivre. Par exemple, le travailleur vérifie l'état du travail dans une base de données pour voir si le travail a déjà été annulé. Pour les scénarios où la vitesse de traitement des travaux peut être plus rapide que la vitesse avec laquelle le magasin faisant autorité peut être mis à jour et lu, un magasin de données moins garanti qui échange de la vitesse pour d'autres caractéristiques peut être utile. Un exemple de ceci serait d'utiliser Redis comme magasin pour annuler le traitement d'un message au lieu d'un DB relationnel comme MySQL. Redis est très rapide, mais donne moins de garanties sur les données qu'il contient, alors que MySQL est beaucoup plus lent, mais offre plus de garanties sur les données qu'il contient. En fin de compte, le concept de vérifier avec une autre source pour traiter ou non un message est le même, mais la façon dont vous implémentez cela dépend de votre scénario particulier.