2010-03-18 15 views
13

J'utilise RabbitMQ sur RHEL 5.3 en utilisant le client Java. J'ai 2 noeuds (machines). Node1 consomme des messages à partir d'une file d'attente sur Node2 à l'aide de la classe auxiliaire Java QueueingConsumer.En utilisant RabbitMQ (client Java), existe-t-il un moyen de déterminer si la connexion réseau est fermée pendant la consommation?

QueueingConsumer consumer = new QueueingConsumer(channel); 
channel.basicConsume("MyQueueOnNode2", noAck, consumer); 
while (true) 
{ 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    ... Process message - delivery.getBody() 
} 

Si l'interface est désactivée sur Node1 ou Node2 (par exemple ifconfig eth1 vers le bas), le client (ci-dessus) ne sait jamais le réseau est plus là. RabbitMQ fournit-il un type de configuration sur le client Java qui peut être utilisé pour déterminer si la connexion a disparu. L'arrêt du serveur RabbitMQ sur Node2 déclenche une exception ShutdownSignalException, qui peut être interceptée et l'application peut entrer dans une boucle de reconnexion. Mais l'abaissement de l'interface ne provoque aucun type d'exception, donc le code attendra toujours sur consumer.nextDelivery().

J'ai également essayé d'utiliser la version timeout de cet appel. par exemple.

QueueingConsumer consumer = new QueueingConsumer(channel); 
channel.basicConsume("MyQueueOnNode2", noAck, consumer); 
int timeout_ms = 30000; 
while (true) 
{ 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms); 
    if (delivery == null) 
    { 
     if (channel.isOpen() == false)    // Seems to always return true 
     { throw new ShutdownSignalException(); } 
    } 
    else 
    { 
    ... Process message - delivery.getBody() 
    } 
} 

mais semble que cela retourne toujours vrai (même si l'interface est en panne). Je suppose que l'inscription pour le ShutdownListener sur la connexion donnera les mêmes résultats, mais je n'ai pas encore essayé.

Existe-t-il un moyen de configurer une sorte de pulsation, ou avez-vous juste besoin d'écrire une logique de location personnalisée (par exemple "Je suis là maintenant") pour que cela fonctionne?

Répondre

4

En général, vous êtes beaucoup mieux poster des questions concernant rabbitmq sur la liste de diffusion rabbitmq-discuss. Nous n'avons pas tendance à suivre les questions posées en dehors de cela.

Il y a une pulsation que vous pouvez configurer, même si elle est désactivée par défaut. Vous pouvez également activer TCP Keep Alive. Soit appeler setRequestedHeartbeat sur le ConnectionFactory avant de créer une nouvelle connexion, ou la sous-classe ConnectionFactory, substituer la méthode configureSocket, et appeler socket.setKeepAlive(true). Dans les deux cas, la connexion doit être détectée lorsque le réseau meurt.

3

En ce qui concerne la méthode isOpen, qui est bien décrit dans la documentation: http://www.rabbitmq.com/api-guide.html#shutdown-atomicity

En ce qui concerne l'arrêt: avec la fermeture node1 ou 2 vous dire le droit d'application, pas le serveur RabbitMQ lui-même? Pourquoi voudriez-vous savoir sur toute demande si une autre application se déconnecte du courtier de messages? Ce n'est pas le but de la messagerie.

La seule chose que vous pouvez faire, est d'envoyer des messages avec un paramètre « obligatoire ». Cela indique au serveur RabbitMQ vous vous attendez au moins 1 écouteur pour le message que vous avez envoyé (que ce soit une file d'attente directe ou une file d'attente dans un échange de sujet/fanout). Si le message ne peut alors pas être livré à une file d'attente, le message sera de retour à votre chaîne et transmis à ReturnListener donné.