2010-07-13 24 views
6

J'utilise ActiveMQ comme courtier pour livrer des messages. Ces messages sont destinés à être écrits dans une dabatase. Parfois, la base de données est inaccessible ou inaccessible. Dans ce cas, je veux annuler mon message pour réessayer plus tard ce message et je souhaite continuer à lire les autres messages.ActiveMQ: la file d'attente des lettres mortes permet de garder l'ordre de mes messages

Ce code fonctionne très bien, sauf un point: le message rollbacked me bloque à la lecture des autres:

private Connection getConnection() throws JMSException { 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages 
    redeliveryPolicy.setInitialRedeliveryDelay(5 *1000); // will wait 5s to read that message 

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
    Connection connection = connectionFactory.createConnection(); 
    ((ActiveMQConnection)connection).setUseAsyncSend(true); 
    ((ActiveMQConnection)connection).setDispatchAsync(true); 
    ((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy); 
    ((ActiveMQConnection)connection).setStatsEnabled(true); 
    connection.setClientID("myClientID"); 
    return connection; 
} 

Je crée ma session ainsi:

session = connection.createSession(true, Session.SESSION_TRANSACTED); 

Rollback est facile de demander :

session.rollback(); 

Imaginons que j'ai 3 messages dans ma file d'attente:

1: ok 
2: KO (will need to be treated again : the message I want to rollback) 
3: ok 
4: ok 

Mon consommateur fera (séquence linéaire):

commit 1 
rollback 2 
wait 5s 
rollback 2 
wait 5s 
rollback 2 
put 2 in dead letter queue (ActiveMQ.DLQ) 
commit 3 
commit 4 

Mais je veux:

commit 1 
rollback 2 
commit 3 
commit 4 
wait 5s 
rollback 2 
wait 5s 
rollback 2 
wait 5s 
put 2 in dead letter queue (ActiveMQ.DLQ) 

Alors, comment puis-je configurer mon consommateur pour retarder mes messages rollbacked plus tard?

Répondre

8

Ceci est en fait le comportement attendu, car les tentatives de messages sont gérées par le client, et non le courtier. Ainsi, puisque vous avez 1 session liée, et que votre politique de réessai est configurée pour les 3 tentatives avant DLQ, alors le processus entier de réessais bloque ce thread particulier. Donc, ma première question est que si l'insertion de la base de données échoue, ne vous attendez-vous pas à ce que toutes les autres insertions de la base de données échouent pour une raison similaire? Si ce n'est pas le cas, la manière de contourner le problème consiste à définir la politique de réessai pour cette file d'attente avec 0 nouvelle tentative, avec un DLQ spécifique, afin que les messages échouent immédiatement et passent dans DLQ. Ensuite, avoir un autre processus qui se retire de la DLQ toutes les 5 secondes et les retraites et/ou le remet dans la file d'attente principale pour le traitement.

+0

merci pour l'explication claire du comportement. Nous avons finalement choisi RabbitMQ comme courtier et implémenté le DLQ par nous-mêmes. –

0

Utilisez-vous le <strictOrderDispatchPolicy /> dans le fichier de configuration XML ActiveMQ? Je ne suis pas sûr si cela affectera l'ordre des messages pour la relivraison ou non. Si vous utilisez une répartition stricte des commandes, essayez de mettre en commentaire cette stratégie pour voir si cela modifie le comportement.

Bruce

+0

Non, je ne suis pas à l'aide de cette politique. Merci pour le lien. –

0

J'ai eu le même problème, je n'ai pas trouvé de solution ici, donc j'ai décidé de le poster ici après que j'en ai trouvé un pour les gens qui luttent avec le même. Ceci est corrigé avant la version 5.6 lorsque vous définissez la propriété nonBlockingRedelivery true dans l'usine de connexion:

<property name="nonBlockingRedelivery" value="true" />