Bonjour,
je cherche à mettre en place un système de notifications basé sur JMS (ActiveMQ).
Pour cela, j'ai 2 classes, le Sender et le Receiver.
Le Receiver est de type "DurableConsumer" et la classe implémente MessageListener.
La destination est un Topic.
Le problème que je rencontre est le suivant : quand un message est envoyé par le Sender, celui-ci est stocké dans un Topic en attendant d'être consommé par les abonnés. Or quand j'exécute la classe Receiver, celle-ci ne dépile pas les notifs envoyées pendant qu'elle était "offline".
Exemple :
Sender envoie Message1 | 2 abonnés "offline" -> Dans la console ActiveMQ je vois bien 1 message "enqueued" pour 0 abonné
Connexion Abonné1 et Abonné2
Sender envoie Message2 | 2 abonnés "online" -> Dans la console Active MQ je vois bien 2 messages "enqueued" pour 2 abonnés mais seul le Message2 est "dequeued".
Comment faire pour que, lorsque les abonnés se connectent, ils puissent recevoir les messages envoyés pendant leur absence ?
La classe Sender
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 package com.citizenweb.classes; import java.io.IOException; import java.util.Date; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Topic; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.ObjectMessage; import javax.management.AttributeNotFoundException; import javax.management.InstanceNotFoundException; import javax.management.MBeanException; import javax.management.MalformedObjectNameException; import javax.management.ReflectionException; import javax.naming.NamingException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.citizenweb.interfaces.PostIF; import com.citizenweb.interfaces.UserIF; public class Sender { private ConnectionFactory factory = null; private Connection connection = null; private Session session = null; private Destination destination = null; private MessageProducer producer = null; public Sender() { } public void connect(){ try{ factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e){ e.printStackTrace(); } } public void sendPost(UserIF user,PostIF post) { try { connect(); destination = session.createTopic(user.toString()); producer = session.createProducer(destination); ObjectMessage postMessage = session.createObjectMessage(); postMessage.setObject(post); producer.send(postMessage); System.out.println("Object message sent"); } catch (JMSException e) { e.printStackTrace(); } } public void sendInformation(UserIF user,String info){ connect(); try { destination = session.createTopic(user.toString()); producer = session.createProducer(destination); TextMessage infoMessage = session.createTextMessage(); infoMessage.setText(info); producer.send(infoMessage); System.out.println("Information message sent"); } catch (JMSException e) { e.printStackTrace(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Sender sender = new Sender(); Date date = new Date(1971,6,7); UserIF user = new User("Nom1","Prenom1","login","mdp",date,"nom@prenom.fr"); PostIF post = new Post("Mon Post","Ceci est mon message",user,user,"Classe Sender",((User) user).getIdUser(),0); sender.sendPost(user,post); sender.session.close(); sender.connection.close(); } }
La classe Receiver
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 package com.citizenweb.classes; import java.util.ArrayList; import java.util.Date; import java.util.List; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import com.citizenweb.interfaces.PostIF; import com.citizenweb.interfaces.UserIF; import com.citizenweb.classes.Post; public class Receiver implements MessageListener { private ActiveMQConnectionFactory factory = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private Topic destination = null; private MessageConsumer consumer = null; UserIF userTopic = new User(); UserIF userSubscriber = new User(); String nomAbonnement = ""; List<Message> listeMsg = new ArrayList<Message>(); public Receiver(UserIF topic,UserIF subscriber) { this.userTopic = topic; this.userSubscriber = subscriber; this.nomAbonnement=topic.toString()+"->"+subscriber.toString(); } public void connect(){ try{ factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production factory.setTrustAllPackages(true); connection = (ActiveMQConnection) factory.createConnection(); //ClientID : https://qnalist.com/questions/2068823/create-durable-topic-subscriber connection.setClientID(userSubscriber.toString()); connection.start(); session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e){ e.printStackTrace(); } } public void subscribeTopic(){ try { if(session==null){connect();} destination = session.createTopic(userTopic.toString()); consumer = session.createDurableSubscriber(destination,nomAbonnement); } catch (JMSException e) { e.printStackTrace(); } } public void receiveMessage() { try { if(session==null){connect();} subscribeTopic(); consumer.setMessageListener(this); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { System.out.println("OnMessage triggered for " + userSubscriber.toString()); listeMsg.add(message); System.out.println("Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size()); try{ if (message instanceof TextMessage) { TextMessage text = (TextMessage) message; System.out.println("Information : " + text.getText()); } if (message instanceof ObjectMessage) { System.out.println("ObjectMessage"); ObjectMessage oMessage = (ObjectMessage) message; if (oMessage.getObject() instanceof PostIF){ PostIF post = (Post)oMessage.getObject(); String s = ((Post) post).getCorpsMessage(); System.out.println("Post : " + s); } } } catch(JMSException e){ e.printStackTrace(); } } public static void main(String[] args) throws JMSException { Date sub1Date = new Date(1950,0,20); UserIF sub1User = new User("Abonné1","prénom1","login","mdp",sub1Date,"prenom@abonné1.fr"); Date sub2Date = new Date(1950,0,20); UserIF sub2User = new User("Abonné2","prénom2","login","mdp",sub2Date,"prenom@abonné2.fr"); Date topDate = new Date(1971,6,7); UserIF topUser = new User("Topic1","prénom3","login","mdp",topDate,"prenom@topic1.fr"); Receiver receiver1 = new Receiver(topUser,sub1User); Receiver receiver2 = new Receiver(topUser,sub2User); receiver1.receiveMessage(); receiver2.receiveMessage(); } }
Merci d'avance pour votre aide.
Partager