2008-09-29 13 views
9

Filtrage du récepteur de message JMS par JMSCorrelationID

Comment puis-je instancier un écouteur de file d'attente JMS dans Java (JRE/JDK/J2EE 1.4) qui ne reçoit que les messages qui correspondent à un JMSCorrelationID donné? Les messages que je cherche à ramasser ont été publiés dans une file d'attente et non dans un sujet, bien que cela puisse changer si nécessaire.

Voici le code que j'utilise actuellement pour mettre le message dans la file d'attente:

/** 
* publishResponseToQueue publishes Requests to the Queue. 
* 
* @param jmsQueueFactory    -Name of the queue-connection-factory 
* @param jmsQueue     -The queue name for the request 
* @param response      -A response object that needs to be published 
* 
* @throws ServiceLocatorException  -An exception if a request message 
*          could not be published to the Topic 
*/ 
private void publishResponseToQueue(String jmsQueueFactory, 
            String jmsQueue, 
            Response response) 
     throws ServiceLocatorException { 

    if (logger.isInfoEnabled()) { 
     logger.info("Begin publishRequestToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + "," + response); 
    } 
    logger.assertLog(jmsQueue != null && !jmsQueue.equals(""), 
         "jmsQueue cannot be null"); 
    logger.assertLog(jmsQueueFactory != null && !jmsQueueFactory.equals(""), 
         "jmsQueueFactory cannot be null"); 
    logger.assertLog(response != null, "Request cannot be null"); 

    try { 

     Queue queue = (Queue)_context.lookup(jmsQueue); 

     QueueConnectionFactory factory = (QueueConnectionFactory) 
      _context.lookup(jmsQueueFactory); 

     QueueConnection connection = factory.createQueueConnection(); 
     connection.start(); 
     QueueSession session = connection.createQueueSession(false, 
            QueueSession.AUTO_ACKNOWLEDGE); 

     ObjectMessage objectMessage = session.createObjectMessage(); 

     objectMessage.setJMSCorrelationID(response.getID()); 

     objectMessage.setObject(response); 

     session.createSender(queue).send(objectMessage); 

     session.close(); 
     connection.close(); 

    } catch (Exception e) { 
     //XC3.2 Added/Modified BEGIN 
     logger.error("ServiceLocator.publishResponseToQueue - Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     throw new ServiceLocatorException("ServiceLocator.publishResponseToQueue " + 
              "- Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     //XC3.2 Added/Modified END 
    } 

    if (logger.isInfoEnabled()) { 
     logger.info("End publishResponseToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + response); 
    } 

} // end of publishResponseToQueue method 

Répondre

10

La configuration de la connexion de file d'attente est le même, mais une fois que vous avez la OueueSession, vous réglez le sélecteur lors de la création d'un récepteur.

QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'"); 

puis

receiver.receive() 

ou

receiver.setListener(myListener); 
+0

J'ai lu sur le même sujet récemment et poser une question comme suit: sera le récepteur encore recevoir ces messages qui ne contiennent pas l'identifiant de corrélation nécessaire et de les déposer en silence w/o le traitement, ou bien les Fournisseur JMS lui-même ne pas livrer de tels messages au récepteur, de sorte qu'ils restent toujours dans la file d'attente? Je pense que ce dernier est la bonne approche, mais je veux vérifier. Merci. – shrini1000

+0

@ shrini1000 vous avez raison. – Trying

5

BTW alors que son pas la question réelle que vous avez demandé - si vous essayez de mettre en œuvre une réponse à la demande sur JMS, je vous recommande reading this article comme L'API JMS est un peu plus complexe que vous ne pourriez l'imaginer et le faire efficacement est beaucoup plus difficile qu'il n'y paraît.

En particulier to use JMS efficiently vous devriez essayer d'éviter de créer des consommateurs pour un seul message, etc.

aussi parce que l'API JMS est donc très complexe à utiliser correctement et efficacement - en particulier avec la mise en commun, les transactions et le traitement simultané - Je recommande les gens hide the middleware from their application code tels que via l'utilisation Apache Camel's Spring Remoting implementation for JMS

+0

Je me serais sauvé beaucoup de réinvention de la roue, si j'avais connu Camel il y a quelques années. –

0
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver receiver = session.createReceiver(queue, filter); 

ici, le récepteur va obtenir les messages pour lesquels JMSCorrelationID est égale à MessageID. ceci est très utile dans le paradigme demande/réponse.

ou vous pouvez définir directement ce à une valeur:

QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";); 

Que vous pouvez le faire soit receiver.receive(2000); ou receiver.setMessageListener(this);

2

Espérons que cela aidera. J'ai utilisé Open MQ.

package com.MQueues; 

import java.util.UUID; 

import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.QueueConnection; 
import javax.jms.QueueReceiver; 
import javax.jms.QueueSession; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import com.sun.messaging.BasicQueue; 
import com.sun.messaging.QueueConnectionFactory; 

public class HelloProducerConsumer { 

public static String queueName = "queue0"; 
public static String correlationId; 

public static String getCorrelationId() { 
    return correlationId; 
} 

public static void setCorrelationId(String correlationId) { 
    HelloProducerConsumer.correlationId = correlationId; 
} 

public static String getQueueName() { 
    return queueName; 
} 

public static void sendMessage(String threadName) { 
    correlationId = UUID.randomUUID().toString(); 
    try { 

     // Start connection 
     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
     BasicQueue destination = (BasicQueue) session.createQueue(threadName); 
     MessageProducer producer = session.createProducer(destination); 
     connection.start(); 

     // create message to send 
     TextMessage message = session.createTextMessage(); 
     message.setJMSCorrelationID(correlationId); 
     message.setText(threadName + "(" + System.currentTimeMillis() 
       + ") " + correlationId +" from Producer"); 

     System.out.println(correlationId +" Send from Producer"); 
     producer.send(message); 

     // close everything 
     producer.close(); 
     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void receivemessage(final String correlationId) { 
    try { 

     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 

     BasicQueue destination = (BasicQueue) session.createQueue(getQueueName()); 

     connection.start(); 

     System.out.println("\n"); 
     System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 
     long now = System.currentTimeMillis(); 

     // receive our message 
     String filter = "JMSCorrelationID = '" + correlationId + "'"; 
     QueueReceiver receiver = session.createReceiver(destination, filter); 
     TextMessage m = (TextMessage) receiver.receive(); 
     System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp()); 

     System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 

     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void main(String args[]) { 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId1 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId2 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId3 = getCorrelationId(); 


    HelloProducerConsumer.receivemessage(correlationId2); 

    HelloProducerConsumer.receivemessage(correlationId1); 

    HelloProducerConsumer.receivemessage(correlationId3); 
} 
}