2010-12-14 32 views
23

Est-il possible d'envoyer un message via RabbitMQ avec un certain retard? Par exemple je veux expirer une session client après 30 minutes, et j'envoie un message qui sera traité après 30 minutes.Message retardé dans RabbitMQ

+0

Avez-vous besoin d'utiliser RabbitMQ? –

+1

Oui, cette fonctionnalité est disponible depuis RabbitMQ-3.5.8. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar

+0

Si vous utilisez Spring AMQP, il y a [support pour le plugin] (https://docs.spring.io/spring-amqp/reference/htmlsingle/# delayed-message-exchange). – Gruber

Répondre

5

Ce n'est actuellement pas possible. Vous devez stocker vos horodatages d'expiration dans une base de données ou quelque chose de similaire, puis avoir un programme d'assistance qui lit ces horodatages et met en file d'attente un message.

Les messages différés sont une fonctionnalité souvent demandée, car ils sont utiles dans de nombreuses situations. Cependant, si votre besoin est d'expirer des sessions clientes, je crois que la messagerie n'est pas la solution idéale pour vous, et qu'une autre approche pourrait mieux fonctionner.

9

Avec la sortie de RabbitMQ v2.8, livraison prévue est maintenant disponible, mais comme une caractéristique indirecte: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

+0

J'ai essayé cette approche, mais a rencontré quelques problèmes, des suggestions quelqu'un? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 –

+5

J'ai fait un pic et j'ai frappé quelques showstoppers: 1. Les messages ne sont que DLQ: fr quand en haut de la section Q (http://www.rabbitmq.com/ttl.html - Avertissements) Cela signifie que si j'ai d'abord mis msg 1 pour expirer dans 4 heures et msg2 expirera dans 1 heure msg2 expirera seulement après l'expiration de msg1. 2. La durée de vie du message est conservée par Rabbit. Supposons que vous utilisez un délai d'attente de 10 secondes. Si le consommateur n'a pas été en mesure de consommer le message avec 10 secondes après son expiration (en raison d'un arriéré), il sera mis au rebut et perdu Ce qui précède a été vérifié avec Rabbit 3.0.1 Avez-vous les gars voir des solutions de contournement ? –

6

Comme je n'ai pas assez réputation pour ajouter un commentaire, affichant une nouvelle réponse. Ceci est juste un ajout à ce qui a déjà été discuté au http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Sauf au lieu de définir des messages sur ttl, vous pouvez le définir au niveau de la file d'attente. Vous pouvez également éviter de créer un nouvel échange juste pour rediriger les messages vers différentes files d'attente. Voici un exemple de code java:

Producteur:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.util.HashMap; 
import java.util.Map; 

public class DelayedProducer { 
    private final static String QUEUE_NAME = "ParkingQueue"; 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 
     Channel channel = connection.createChannel(); 

     Map<String, Object> arguments = new HashMap<String, Object>(); 
     arguments.put("x-message-ttl", 10000); 
     arguments.put("x-dead-letter-exchange", ""); 
     arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME); 
     channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 

     for (int i=0; i<5; i++) { 
      String message = "This is a sample message " + i; 
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
      System.out.println("message "+i+" got published to the queue!"); 
      Thread.sleep(3000); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

Consommateur:

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Consumer { 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     boolean autoAck = false; 
     channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      System.out.println(" [x] Received '" + message + "'"); 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
     } 

    } 
} 
+0

Merci beaucoup. Je pense que vous avez une petite erreur dans la file d'attente du consommateur déclare channel.queueDeclare (QUEUE_NAME, false, false, false, null); Il devrait avoir le "DESTINATION_QUEUE_NAME" au lieu de "QUEUE_NAME". Vraiment merci beaucoup –

5

Il ressemble à this blog post décrit en utilisant l'échange de lettre morte et le message ttl à faire quelque chose de similaire.

Le code ci-dessous utilise CoffeeScript et Node.JS pour accéder à Rabbit et implémenter quelque chose de similaire.

amqp = require 'amqp' 
events = require 'events' 
em  = new events.EventEmitter() 
conn = amqp.createConnection() 

key = "send.later.#{new Date().getTime()}" 
conn.on 'ready', -> 
    conn.queue key, { 
    arguments:{ 
     "x-dead-letter-exchange":"immediate" 
    , "x-message-ttl": 5000 
    , "x-expires": 6000 
    } 
    }, -> 
    conn.publish key, {v:1}, {contentType:'application/json'} 

    conn.exchange 'immediate' 

    conn.queue 'right.now.queue', { 
     autoDelete: false 
    , durable: true 
    }, (q) -> 
    q.bind('immediate', 'right.now.queue') 
    q.subscribe (msg, headers, deliveryInfo) -> 
     console.log msg 
     console.log headers 
0

Supposons que vous avez le contrôle sur le consommateur, vous pouvez obtenir le dilatoire sur le consommateur comme ça ??:

Si nous sommes sûrs que le nième message dans la file d'attente a toujours un délai plus court que le n + 1ème message (cela peut être vrai pour de nombreux cas d'utilisation): Le producteur envoie timeInformation dans la tâche indiquant l'heure à laquelle ce travail doit être exécuté (currentTime + delay). Le consommateur:

1) Lit le scheduledTime de la tâche

2) si currentTime> scheduledTime aller de l'avant.

retard Else = scheduledTime - currentTime

sommeil pour le temps indiqué par le retard

Le consommateur est toujours configuré avec un paramètre d'accès concurrentiel. Ainsi, les autres messages vont simplement attendre dans la file d'attente jusqu'à ce qu'un consommateur finisse le travail. Donc, cette solution pourrait bien fonctionner, même si cela semble gênant, surtout pour les retards importants.

5

Grâce à la réponse de Norman, j'ai pu l'implémenter dans NodeJS.

Tout est clair à partir du code. Espérons que cela fera gagner du temps à quelqu'un.

var ch = channel; 
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); 
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); 

// setup intermediate queue which will never be listened. 
// all messages are TTLed so when they are "dead", they come to another exchange 
ch.assertQueue("my_intermediate_queue", { 
     deadLetterExchange: "my_final_delayed_exchange", 
     messageTtl: 5000, // 5sec 
}, function (err, q) { 
     ch.bindQueue(q.queue, "my_intermediate_exchange", ''); 
}); 

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { 
     ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); 

     ch.consume(q.queue, function (msg) { 
      console.log("delayed - [x] %s", msg.content.toString()); 
     }, {noAck: true}); 
}); 
4

Il y a deux approches que vous pouvez essayer:

ancienne approche: le TTL (time to live) en-tête dans chaque message/file d'attente (politique) et mettent en place un DLQ pour y faire face. Une fois le ttl expiré, vos messages passeront de DLQ à la file d'attente principale pour que votre auditeur puisse le traiter.

Dernière approche: récemment RabbitMQ est venu avec RabbitMQ message retardé Plugin, l'aide que vous pouvez obtenir le même et ce support de plug-in disponible depuis RabbitMQ-3.5.8.

Vous pouvez déclarer un échange avec le type x-delayed-message et ensuite publier des messages avec l'en-tête personnalisé x-delay exprimant en millisecondes un délai pour le message. Le message sera livré aux files d'attente respectives après millisecondes x retard

Plus ici: git