2010-06-21 19 views
11

Notez que je souhaite que plusieurs écouteurs de message traitent simultanément les messages successifs du sujet. De plus, j'aimerais que chaque écouteur de message fonctionne de manière transactionnelle de sorte qu'un échec de traitement dans un écouteur de message donné entraîne le maintien du message de cet écouteur sur le sujet.Comment puis-je gérer plusieurs messages simultanément à partir d'un sujet JMS (pas la file d'attente) avec java et spring 3.0?

Le ressort DefaultMessageListenerContainer semble prendre en charge la simultanéité pour les files d'attente JMS uniquement.

Ai-je besoin d'instancier plusieurs DefaultMessageListenerContainers?

Si le temps coule dans l'axe vertical:

ListenerA reads msg 1  ListenerB reads msg 2  ListenerC reads msg 3 
ListenerA reads msg 4  ListenerB reads msg 5  ListenerC reads msg 6 
ListenerA reads msg 7  ListenerB reads msg 8  ListenerC reads msg 9 
ListenerA reads msg 10  ListenerB reads msg 11  ListenerC reads msg 12 
... 

MISE À JOUR:
Merci pour vos commentaires @ T.Rob et @skaffman.

Ce que je fini par faire est la création de multiples DefaultMessageListenerContainers avec concurrency=1 et puis mettre logique dans l'auditeur de message de sorte que seul thread traiterait un identifiant de message donné.

+0

Pourriez-vous clarifier? Quand je vois "plusieurs écouteurs de message pour gérer des messages successifs du sujet simultanément", je pense que cela signifie que vous ne voulez pas que les auditeurs obtiennent chacun une copie du même message, mais plutôt qu'ils se disputent les mêmes messages. Est-ce exact? –

+0

Cela semble utile: http://bsnyderblog.blogspot.com/2010/05/tuning-jms-message-consumption-in.html – skaffman

Répondre

5

Vous ne voulez pas plusieurs instances DefaultMessageListenerContainer, non, mais vous avez besoin de configurer le DefaultMessageListenerContainer pour être en même temps, en utilisant le concurrentConsumers property:

Indiquez le nombre de concurrents consommateurs à créer. Par défaut est 1.

Spécification d'une valeur plus élevée pour ce paramètre augmentera le niveau niveau de simultanés consommateurs programmés à l'exécution: Ceci est efficacement le nombre minimum de consommateurs simultanés qui sera prévue à un moment donné . Ceci est un réglage statique ; pour la mise à l'échelle dynamique, vous devez spécifier le paramètre "maxConcurrentConsumers" à la place.

Augmenter le nombre de consommateurs simultanés est recommandable pour échelle la consommation de messages venant de la file d'attente. Cependant, notez que toutes les garanties de commande sont perdues une fois que plusieurs consommateurs sont enregistrés . En général, respectez le consommateur 1 pour les files d'attente à faible volume.

Cependant, il y a grand avertissement en bas:

Ne pas augmenter le nombre de consommateurs simultanés pour un sujet. Cela conduirait à la consommation simultanée du même message, ce qui est rarement souhaitable.

Ceci est intéressant et prend tout son sens lorsque vous y réfléchissez. La même chose se produirait si vous aviez plusieurs instances DefaultMessageListenerContainer.

Je pense que vous devez peut-être repenser votre conception, même si je ne suis pas sûr de ce que je suggérerais. La consommation simultanée de pub/sous-messages semble être une chose parfaitement raisonnable à faire, mais comment éviter d'avoir le même message délivré à tous vos consommateurs en même temps?

1

C'est l'une de ces occasions où les différences dans les fournisseurs de transport montent en flèche grâce à l'abstraction de JMS. JMS souhaite fournir une copie du message pour chaque abonné sur un sujet. Mais le comportement que vous voulez est vraiment celui d'une file d'attente. Je soupçonne qu'il existe d'autres exigences qui conduisent à une solution de pub/sous qui ne sont pas décrites - par exemple d'autres choses doivent s'abonner au même sujet indépendant de votre application. Si je devais le faire dans WebSphere MQ, la solution serait de créer un abonnement administratif qui entraînerait la mise en file d'attente d'une seule copie de chaque message sur le sujet donné. Ensuite, vos abonnés multiples pourraient rivaliser pour les messages sur cette file d'attente. De cette façon, votre application pourrait avoir plusieurs threads parmi lesquels les messages sont distribués, et en même temps d'autres abonnés indépendants de cette application pourraient (sous) s'abonner de façon dynamique au même sujet.

Malheureusement, il n'existe pas de méthode générique JMS portable pour cela. Vous dépendez beaucoup de la mise en œuvre du fournisseur de transport. WebSphere MQ est la seule personne à qui je peux parler mais je suis sûr que d'autres transports le supportent d'une manière ou d'une autre et à des degrés divers si vous êtes créatif.

+0

J'aime votre idée. Je suppose que nous pouvons l'implémenter sans lier à un fournisseur spécifique. Nous créons un sujet et un seul abonné pour cela. Cet abonné met le message du sujet dans une file d'attente et maintenant plusieurs consommateurs de file d'attente peuvent rivaliser pour cela. Il ajoute un niveau d'indirection, mais résout le problème de concurrence pour le sujet dans DMLC. – shrini1000

0

J'ai rencontré le même problème. J'étudie actuellement RabbitMQ, qui semble offrir une solution parfaite dans un modèle de conception qu'ils appellent des «files d'attente de travail». Plus d'infos ici: http://www.rabbitmq.com/tutorials/tutorial-two-java.html

Si vous n'êtes pas totalement lié à JMS, vous pouvez vous y intéresser. Il pourrait également y avoir un pont JMS vers AMQP, mais cela pourrait commencer à être hacky. Je m'amuse (lire: difficultés) à installer RabbitMQ et à fonctionner sur mon Mac mais pense que je suis sur le point de le faire fonctionner, je le posterai si je suis capable de le résoudre.

+0

Essayé et RabbitMQ fonctionne comme un charme. Ce n'est pas JMS, mais j'utilise Spring et le support Rabbit/AMQP est assez bon pour moi. – cobbzilla

+0

De toute façon dans mon expérience rabbitmq a des problèmes pour perdre des messages dans un écosystème clusterisé – deFreitas

-2

Je suis tombé sur cette question. Ma configuration est:

Créez un haricot avec id="DefaultListenerContainer", ajoutez la propriété name="concurrentConsumers" value="10" et la propriété name="maxConcurrentConsumers" value ="50".

Fonctionne bien, jusqu'à présent. J'ai imprimé l'ID de thread et vérifié que plusieurs threads sont créés et réutilisés.

+0

Vérifiez l'avertissement que skaffman mentionné dans sa réponse ci-dessus. – shrini1000

+0

Cette réponse contenait une promesse d'ajouter des tests de performance, mais cela n'a jamais été ajouté! J'ai supprimé ce texte, mais si vous souhaitez l'ajouter à un moment donné, n'hésitez pas. – halfer

1

Voici une possibilité:

1) créer un seul DMLC configuré avec le grain et la méthode pour gérer le message entrant. Définissez sa concurrence à 1.

2) Configurez un exécuteur de tâche avec ses #threads égaux à la concurrence que vous désirez. Créez un pool d'objets pour les objets supposés traiter un message. Donnez une référence de l'exécuteur de tâches et du pool d'objets au bean que vous avez configuré au n ° 1. Le pool d'objets est utile si le bean de traitement des messages n'est pas thread-safe.

3) Pour un message entrant, le bean dans DMLC crée un Runnable personnalisé, le pointe vers le message et le pool d'objets et le donne à l'exécuteur de tâches.

4) La méthode run de Runnable obtient un bean du pool d'objets et appelle sa méthode 'process' avec le message donné.

# 4 peut être géré avec un proxy et le pool d'objets pour le rendre plus facile.

Je n'ai pas encore essayé cette solution, mais elle semble correspondre à la facture. Notez que cette solution n'est pas aussi robuste que EJB MDB. Spring par ex. ne rejettera pas un objet du pool s'il lance une exception RuntimeException.

+2

Comment vous assurez-vous que les messages JMS entrants ne sont pas acquittés jusqu'à ce que Runnable se termine avec succès? –

1

Au moins dans ActiveMQ ce que vous voulez est totalement pris en charge, son nom est VirtualTopic

Le concept est le suivant:

  1. Vous créez un VirtualTopic (Il suffit de créer un sujet en utilisant le préfixe VirtualTopic.), par exemple . VirtualTopic.Color
  2. Créez un consommateur abonné à ce VirtualTopic correspondant à ce modèle Consumer.<clientName>.VirtualTopic.<topicName> par exemple. Consumer.client1.VirtualTopic.Color, Activemq créera une file d'attente avec ce nom et cette file d'attente s'abonnera à VirtualTopic.Color puis tous les messages publiés sur ce sujet virtuel seront envoyés à la file d'attente client1, notez que cela fonctionne comme des échanges rabbitmq.
  3. Vous avez terminé, vous pouvez maintenant consommer client1 file d'attente comme chaque file d'attente, avec de nombreux consommateurs, DLQ, la politique de re-livraison sur mesure, etc.
  4. À ce stade, je pense que vous avez compris que vous pouvez créer client2, client3 et combien abonnés que vous voulez, tous recevront une copie du message publié à VirtualTopic.Color

Voici le code

@Component 
public class ColorReceiver { 

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    // simply generating data to the topic 
    long id=0; 
    @Scheduled(fixedDelay = 500) 
    public void postMail() throws JMSException, IOException { 

     final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)]; 
     final Color color = new Color(++id, colorName.getName()); 
     final ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 
     message.setObject(color); 
     message.setProperty("color", color.getName()); 
     LOGGER.info("status=color-post, color={}", color); 
     jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message); 
    } 

    /** 
    * Listen all colors messages 
    */ 
    @JmsListener(
     destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer" 
     selector = "color <> 'RED'" 
    ) 
    public void genericReceiveMessage(Color color) throws InterruptedException { 
     LOGGER.info("status=GEN-color-receiver, color={}", color); 
    } 

    /** 
    * Listen only red colors messages 
    * 
    * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that 
    * the containers clientId need to be different between each other 
    */ 
    @JmsListener(
//  destination = "Consumer.redColorContainer.VirtualTopic.color", 
     destination = "Consumer.client1.VirtualTopic.color", 
     containerFactory = "redColorContainer", selector = "color='RED'" 
    ) 
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException { 
     LOGGER.info("status=RED-color-receiver, color={}", message.getObject()); 
    } 

    /** 
    * Listen all colors messages 
    */ 
    @JmsListener(
     destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer" 
    ) 
    public void genericReceiveMessage2(Color color) throws InterruptedException { 
     LOGGER.info("status=GEN-color-receiver-2, color={}", color); 
    } 

} 

@SpringBootApplication 
@EnableJms 
@EnableScheduling 
@Configuration 
public class Config { 

    /** 
    * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different 
    * clientIds per consumer pool (as two @JmsListener above, or two application instances) 
    * 
    */ 
    @Bean 
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
     DefaultJmsListenerContainerFactoryConfigurer configurer) { 

     final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setConcurrency("1-5"); 
     configurer.configure(factory, connectionFactory); 
     // container.setClientId("aId..."); lets spring generate a random ID 
     return factory; 
    } 

    @Bean 
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory, 
     DefaultJmsListenerContainerFactoryConfigurer configurer) { 

     // necessary when post serializable objects (you can set it at application.properties) 
     connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName())); 

     final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setConcurrency("1-2"); 
     configurer.configure(factory, connectionFactory); 
     return factory; 
    } 

} 

public class Color implements Serializable { 

    public static final Color WHITE = new Color("WHITE"); 
    public static final Color BLUE = new Color("BLUE"); 
    public static final Color RED = new Color("RED"); 

    private String name; 
    private long id; 

    // CONSTRUCTORS, GETTERS AND SETTERS 
}