2010-12-09 95 views
1

Je suis juste affaire à un nouveau scénario pour moi, que je crois peut-être commun à un certain :) ..Comment faire pour passer un JMS à un pont WebSphere MQ dans un appel synchrone à l'aide du modèle de demande-réponse?

Conformément aux exigences que je besoin de construire une expérience utilisateur d'être comme une transaction synchrone en ligne pour une appel de service Web, qui délègue l'appel à une série IBM MQ à l'aide d'un pont JMS-MQ asynchrone. Le client appelle le service Web et son message doit être publié dans une file d'attente JMS sur le serveur App qui sera remis à WebSphere MQ. Après le traitement, une réponse sera renvoyée au serveur App dans un point de terminaison de file d'attente FIXED JMS. .

L'exigence concerne cette transaction qui doit expirer au cas où WebSphere MQ ne fournit pas la réponse dans un délai défini, que le service Web envoie un signal de délai d'attente au client et ignore cette transaction.

L'esquisse du problème est la suivante.

Je dois bloquer la demande sur le service Web jusqu'à ce que la réponse arrive ou expire.

Que je suis à la recherche d'une bibliothèque ouverte pour m'aider dans cette tâche. Ou la seule solution est de bloquer un thread et de garder la mise en commun pour la réponse? Peut-être que je pourrais implémenter un bloc avec un écouteur pour être averti quand la réponse arrivera? Un peu de discussion serait très utile pour moi maintenant pour essayer d'effacer mes idées à ce sujet. Des suggestions?

J'ai un croquis que je l'espère aider à effacer l'image;)

alt text

Répondre

1

après quelques jours de codage je suis arrivé à une solution pour cela. J'utilise le standard EJB3 avec les annotations JAX-WS et le standard JMS.

Le code que j'ai écrit jusqu'à présent pour répondre aux exigences suivantes. Il s'agit d'un bean session sans état avec transaction bean (BMT) car l'utilisation de la transaction CMT (standart container managed transaction) provoquait une sorte de blocage, je crois parce que j'essayais de mettre les deux interactions JMS dans la même transaction que dans la même méthode donc notez que je devais commencer et terminer les transactions pour chaque interaction avec les files d'attente JMS. J'utilise weblogic pour cette solution. Et j'ai aussi codé un MDB qui consomme essentiellement le message du point de terminaison jms/Pergunta et place un message de réponse dans la file d'attente jms/Resposta que j'ai fait pour simuler le comportement attendu du côté MQ de ce problème. En fait, dans un scénario réel, nous aurions probablement une application COBOL sur le mainframe ou même une autre application Java traitant les messages et plaçant la réponse dans la file d'attente de réponse.

Si quelqu'un a besoin d'essayer ce code, il suffit simplement d'avoir un conteneur J2EE5 et de configurer 2 files d'attente avec des noms jndi: jms/Pergunta et jms/Resposta.

Le code EJB/Webservice:

@Stateless 
@TransactionManagement(TransactionManagementType.BEAN) 
@WebService(name="DJOWebService") 
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal { 

    Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName()); 

    @Resource 
    SessionContext ejbContext; 

    // Defines the JMS connection factory. 
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory"; 

    // Defines request queue 
    public final static String QUEUE_PERG = "jms/Pergunta"; 

    // Defines response queue 
    public final static String QUEUE_RESP = "jms/Resposta"; 


    Context ctx; 
    QueueConnectionFactory qconFactory; 

    /** 
    * Default constructor. 
    */ 
    public DJOSessionBeanWS() { 
     log.info("Construtor DJOSessionBeanWS"); 
    } 

    @WebMethod(operationName = "processaMensagem") 
    public String processaMensagem(String mensagemEntrada, String idUnica) 
    { 
     //gets UserTransaction reference as this is a BMT EJB. 
     UserTransaction ut = ejbContext.getUserTransaction(); 
     try { 

      ctx = new InitialContext(); 
      //get the factory before any transaction it is a weblogic resource. 
      qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); 
      log.info("Got QueueConnectionFactory"); 
      ut.begin(); 
      QueueConnection qcon = qconFactory.createQueueConnection(); 
      QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
      Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta")); 
      TextMessage message = qsession.createTextMessage("this is a request message"); 
      message.setJMSCorrelationID(idUnica); 
      qsession.createSender(qs).send(message); 
      ut.commit(); 
      qcon.close(); 
      //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required 
      ut.begin(); 
      QueueConnection queuecon = qconFactory.createQueueConnection(); 
      Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta")); 
      QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
      String messageSelector = "JMSCorrelationID = '" + idUnica + "'"; 
      //creates que receiver and sets a message selector to get only related message from the response queue. 
        QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector); 
      queuecon.start(); 
      //sets the timeout to keep waiting for the response... 
      TextMessage tresposta = (TextMessage) qr.receive(10000); 
      if(tresposta != null) 
      { 
       ut.commit(); 
       queuecon.close(); 
       return(tresposta.toString()); 
      } 
      else{ 
       //commints anyway.. does not have a response though 
       ut.commit(); 
       queuecon.close(); 
       log.info("null reply, returned by timeout.."); 
       return "Got no reponse message."; 
      } 



     } catch (Exception e) { 
      log.severe("Unexpected error occurred ==>> " + e.getMessage()); 
      e.printStackTrace(); 
      try { 
       ut.commit(); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
      return "Error committing transaction after some other error executing ==> " + e.getMessage(); 
     } 

    } 
} 

Et voici le code du MDB qui se moque du côté MQ de ce problème. J'ai eu un fragment Thread.sleep pendant mes tests pour simuler et tester le timeout côté client pour valider la solution mais il n'est pas présent dans cette version.

/** 
* Mock to get message from request queue and publish a new one on the response queue. 
*/ 
@MessageDriven(
     activationConfig = { @ActivationConfigProperty(
       propertyName = "destinationType", propertyValue = "javax.jms.Queue" 
     ) }, 
     mappedName = "jms/Pergunta") 
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener { 

    Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName()); 

    // Defines the JMS connection factory. 
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory"; 

    // Define Queue de resposta 
    public final static String QUEUE_RESP = "jms/Resposta"; 


    Context ctx; 
    QueueConnectionFactory qconFactory; 



    /** 
    * Default constructor. 
    */ 
    public ConsomePerguntaPublicaRespostaMDB() { 
     log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB"); 
     try { 
      ctx = new InitialContext(); 
     } catch (NamingException e) { 
      e.printStackTrace(); 
     } 
    } 

    /** 
    * @see MessageListener#onMessage(Message) 
    */ 
    public void onMessage(Message message) { 
     log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage"); 
     TextMessage tm = (TextMessage) message; 

     try { 
      log.info("Mensagem recebida no onMessage ==>> " + tm.getText()); 

      //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta. 
      String idMensagem = tm.getJMSCorrelationID(); 
      log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem); 

      qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); 
      log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem"); 
      QueueConnection qcon = qconFactory.createQueueConnection(); 
      QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
      Queue queue = (Queue) (ctx.lookup("jms/Resposta")); 
      TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta..."); 
      tmessage.setJMSCorrelationID(idMensagem); 
      qsession.createSender(queue).send(tmessage); 
     } catch (JMSException e) { 
      log.severe("Erro no onMessage ==>> " + e.getMessage()); 
      e.printStackTrace(); 
     } catch (NamingException e) { 
      log.severe("Erro no lookup ==>> " + e.getMessage()); 
      e.printStackTrace(); 
     } 

    } 

} 

[] s

2

Hey, merci pour poster votre propre solution!

Oui, recevoir() avec timeout est le moyen le plus élégant d'aller dans ce cas.

Méfiez-vous de ce qui se passe avec les messages qui ne sont pas lus à cause du délai d'expiration. Si votre client accède de nouveau à la même file d'attente, il peut récupérer un message périmé.

Assurez-vous que les messages de ce délai sont supprimés en temps opportun (si pour aucune autre raison, ne pas remplir la file d'attente avec des messages non traités). Vous pouvez le faire facilement par le biais du code (définition de la durée de vie sur le producteur de messages) ou sur le serveur WebSphere MQ (en utilisant des files d'attente qui expirent automatiquement les messages). Ce dernier est plus facile si vous ne pouvez pas/ne voulez pas modifier le côté MQ du code. C'est ce que je ferais :)

+0

Oui, nous avons une file d'erreurs où les messages expirés sont en avant et un autre morceau de code les traite. –