Bonjour,

je cherche à mettre en place un système de notifications basé sur JMS (ActiveMQ).

Pour cela, j'ai 2 classes, le Sender et le Receiver.

Le Receiver est de type "DurableConsumer" et la classe implémente MessageListener.
La destination est un Topic.

Le problème que je rencontre est le suivant : quand un message est envoyé par le Sender, celui-ci est stocké dans un Topic en attendant d'être consommé par les abonnés. Or quand j'exécute la classe Receiver, celle-ci ne dépile pas les notifs envoyées pendant qu'elle était "offline".

Exemple :
Sender envoie Message1 | 2 abonnés "offline" -> Dans la console ActiveMQ je vois bien 1 message "enqueued" pour 0 abonné
Connexion Abonné1 et Abonné2
Sender envoie Message2 | 2 abonnés "online" -> Dans la console Active MQ je vois bien 2 messages "enqueued" pour 2 abonnés mais seul le Message2 est "dequeued".

Comment faire pour que, lorsque les abonnés se connectent, ils puissent recevoir les messages envoyés pendant leur absence ?


La classe Sender

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
 
package com.citizenweb.classes;
 
import java.io.IOException;
import java.util.Date;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.ObjectMessage;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
import javax.naming.NamingException;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;
 
public class Sender {
 
	private ConnectionFactory factory = null;
	private Connection connection = null;
	private Session session = null;
	private Destination destination = null;
	private MessageProducer producer = null;
 
	public Sender() {
 
	}
 
	public void connect(){
		try{
			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e){
			e.printStackTrace();
		}
	}
 
	public void sendPost(UserIF user,PostIF post) {
		try {
			connect();
			destination = session.createTopic(user.toString());
			producer = session.createProducer(destination);
			ObjectMessage postMessage = session.createObjectMessage();
			postMessage.setObject(post);
			producer.send(postMessage);
			System.out.println("Object message sent");
 
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
 
	public void sendInformation(UserIF user,String info){
		connect();
		try {
			destination = session.createTopic(user.toString());
			producer = session.createProducer(destination);
			TextMessage infoMessage = session.createTextMessage();
			infoMessage.setText(info);
			producer.send(infoMessage);
			System.out.println("Information message sent");
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
 
 
	/**
         * @param args
         * @throws Exception
         */
	public static void main(String[] args) throws Exception {
		Sender sender = new Sender();
		Date date = new Date(1971,6,7);
		UserIF user = new User("Nom1","Prenom1","login","mdp",date,"nom@prenom.fr");
		PostIF post = new Post("Mon Post","Ceci est mon message",user,user,"Classe Sender",((User) user).getIdUser(),0);
		sender.sendPost(user,post);
		sender.session.close();
		sender.connection.close();
 
	}
 
}

La classe Receiver
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
 
package com.citizenweb.classes;
 
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;
import com.citizenweb.classes.Post;
 
public class Receiver implements MessageListener {
 
	private ActiveMQConnectionFactory factory = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private Topic destination = null;
	private MessageConsumer consumer = null;
 
	UserIF userTopic = new User();
	UserIF userSubscriber = new User();
	String nomAbonnement = "";
	List<Message> listeMsg = new ArrayList<Message>();
 
	public Receiver(UserIF topic,UserIF subscriber) {
		this.userTopic = topic;
		this.userSubscriber = subscriber;
		this.nomAbonnement=topic.toString()+"->"+subscriber.toString();
	}
 
	public void connect(){
		try{
			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
			factory.setTrustAllPackages(true); 
			connection = (ActiveMQConnection) factory.createConnection();
			//ClientID : https://qnalist.com/questions/2068823/create-durable-topic-subscriber
			connection.setClientID(userSubscriber.toString());
			connection.start();
			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e){
			e.printStackTrace();
		}
	}
 
	public void subscribeTopic(){
		try {
			if(session==null){connect();}
			destination = session.createTopic(userTopic.toString());
			consumer = session.createDurableSubscriber(destination,nomAbonnement);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
 
	public void receiveMessage() {
		try {
			if(session==null){connect();}
			subscribeTopic();
			consumer.setMessageListener(this);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
 
 
	@Override
	public void onMessage(Message message) {
		System.out.println("OnMessage triggered for " + userSubscriber.toString());
		listeMsg.add(message);
		System.out.println("Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size());
		try{
			if (message instanceof TextMessage) {
				TextMessage text = (TextMessage) message;
				System.out.println("Information : " + text.getText());
			}
			if (message instanceof ObjectMessage) {
				System.out.println("ObjectMessage");
				ObjectMessage oMessage = (ObjectMessage) message;
				if (oMessage.getObject() instanceof PostIF){
					PostIF post = (Post)oMessage.getObject();
					String s = ((Post) post).getCorpsMessage();
					System.out.println("Post : " + s);
				}
			}
		} catch(JMSException e){
			e.printStackTrace();
		}
	}
 
	public static void main(String[] args) throws JMSException {
		Date sub1Date = new Date(1950,0,20);
		UserIF sub1User = new User("Abonné1","prénom1","login","mdp",sub1Date,"prenom@abonné1.fr");
		Date sub2Date = new Date(1950,0,20);
		UserIF sub2User = new User("Abonné2","prénom2","login","mdp",sub2Date,"prenom@abonné2.fr");
		Date topDate = new Date(1971,6,7);
		UserIF topUser = new User("Topic1","prénom3","login","mdp",topDate,"prenom@topic1.fr");
		Receiver receiver1 = new Receiver(topUser,sub1User);
		Receiver receiver2 = new Receiver(topUser,sub2User);
		receiver1.receiveMessage();
		receiver2.receiveMessage();
	}
 
}

Merci d'avance pour votre aide.