IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

Java EE Discussion :

Recevoir des messages postés pendant une période de déconnexion [JMS]


Sujet :

Java EE

  1. #1
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut Recevoir des messages postés pendant une période de déconnexion
    Bonjour,

    je cherche à mettre en place un système de notifications basé sur JMS (ActiveMQ).

    Pour cela, j'ai 2 classes, le Sender et le Receiver.

    Le Receiver est de type "DurableConsumer" et la classe implémente MessageListener.
    La destination est un Topic.

    Le problème que je rencontre est le suivant : quand un message est envoyé par le Sender, celui-ci est stocké dans un Topic en attendant d'être consommé par les abonnés. Or quand j'exécute la classe Receiver, celle-ci ne dépile pas les notifs envoyées pendant qu'elle était "offline".

    Exemple :
    Sender envoie Message1 | 2 abonnés "offline" -> Dans la console ActiveMQ je vois bien 1 message "enqueued" pour 0 abonné
    Connexion Abonné1 et Abonné2
    Sender envoie Message2 | 2 abonnés "online" -> Dans la console Active MQ je vois bien 2 messages "enqueued" pour 2 abonnés mais seul le Message2 est "dequeued".

    Comment faire pour que, lorsque les abonnés se connectent, ils puissent recevoir les messages envoyés pendant leur absence ?


    La classe Sender

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
     
    package com.citizenweb.classes;
     
    import java.io.IOException;
    import java.util.Date;
    import java.util.Enumeration;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Topic;
    import javax.jms.QueueBrowser;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.ObjectMessage;
    import javax.management.AttributeNotFoundException;
    import javax.management.InstanceNotFoundException;
    import javax.management.MBeanException;
    import javax.management.MalformedObjectNameException;
    import javax.management.ReflectionException;
    import javax.naming.NamingException;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    import com.citizenweb.interfaces.PostIF;
    import com.citizenweb.interfaces.UserIF;
     
    public class Sender {
     
    	private ConnectionFactory factory = null;
    	private Connection connection = null;
    	private Session session = null;
    	private Destination destination = null;
    	private MessageProducer producer = null;
     
    	public Sender() {
     
    	}
     
    	public void connect(){
    		try{
    			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    			connection = factory.createConnection();
    			connection.start();
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		} catch (JMSException e){
    			e.printStackTrace();
    		}
    	}
     
    	public void sendPost(UserIF user,PostIF post) {
    		try {
    			connect();
    			destination = session.createTopic(user.toString());
    			producer = session.createProducer(destination);
    			ObjectMessage postMessage = session.createObjectMessage();
    			postMessage.setObject(post);
    			producer.send(postMessage);
    			System.out.println("Object message sent");
     
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public void sendInformation(UserIF user,String info){
    		connect();
    		try {
    			destination = session.createTopic(user.toString());
    			producer = session.createProducer(destination);
    			TextMessage infoMessage = session.createTextMessage();
    			infoMessage.setText(info);
    			producer.send(infoMessage);
    			System.out.println("Information message sent");
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
     
    	/**
             * @param args
             * @throws Exception
             */
    	public static void main(String[] args) throws Exception {
    		Sender sender = new Sender();
    		Date date = new Date(1971,6,7);
    		UserIF user = new User("Nom1","Prenom1","login","mdp",date,"nom@prenom.fr");
    		PostIF post = new Post("Mon Post","Ceci est mon message",user,user,"Classe Sender",((User) user).getIdUser(),0);
    		sender.sendPost(user,post);
    		sender.session.close();
    		sender.connection.close();
     
    	}
     
    }

    La classe Receiver
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
     
    package com.citizenweb.classes;
     
     
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
     
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
    import com.citizenweb.interfaces.PostIF;
    import com.citizenweb.interfaces.UserIF;
    import com.citizenweb.classes.Post;
     
    public class Receiver implements MessageListener {
     
    	private ActiveMQConnectionFactory factory = null;
    	private ActiveMQConnection connection = null;
    	private ActiveMQSession session = null;
    	private Topic destination = null;
    	private MessageConsumer consumer = null;
     
    	UserIF userTopic = new User();
    	UserIF userSubscriber = new User();
    	String nomAbonnement = "";
    	List<Message> listeMsg = new ArrayList<Message>();
     
    	public Receiver(UserIF topic,UserIF subscriber) {
    		this.userTopic = topic;
    		this.userSubscriber = subscriber;
    		this.nomAbonnement=topic.toString()+"->"+subscriber.toString();
    	}
     
    	public void connect(){
    		try{
    			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
    			factory.setTrustAllPackages(true); 
    			connection = (ActiveMQConnection) factory.createConnection();
    			//ClientID : https://qnalist.com/questions/2068823/create-durable-topic-subscriber
    			connection.setClientID(userSubscriber.toString());
    			connection.start();
    			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		} catch (JMSException e){
    			e.printStackTrace();
    		}
    	}
     
    	public void subscribeTopic(){
    		try {
    			if(session==null){connect();}
    			destination = session.createTopic(userTopic.toString());
    			consumer = session.createDurableSubscriber(destination,nomAbonnement);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public void receiveMessage() {
    		try {
    			if(session==null){connect();}
    			subscribeTopic();
    			consumer.setMessageListener(this);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
     
    	@Override
    	public void onMessage(Message message) {
    		System.out.println("OnMessage triggered for " + userSubscriber.toString());
    		listeMsg.add(message);
    		System.out.println("Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size());
    		try{
    			if (message instanceof TextMessage) {
    				TextMessage text = (TextMessage) message;
    				System.out.println("Information : " + text.getText());
    			}
    			if (message instanceof ObjectMessage) {
    				System.out.println("ObjectMessage");
    				ObjectMessage oMessage = (ObjectMessage) message;
    				if (oMessage.getObject() instanceof PostIF){
    					PostIF post = (Post)oMessage.getObject();
    					String s = ((Post) post).getCorpsMessage();
    					System.out.println("Post : " + s);
    				}
    			}
    		} catch(JMSException e){
    			e.printStackTrace();
    		}
    	}
     
    	public static void main(String[] args) throws JMSException {
    		Date sub1Date = new Date(1950,0,20);
    		UserIF sub1User = new User("Abonné1","prénom1","login","mdp",sub1Date,"prenom@abonné1.fr");
    		Date sub2Date = new Date(1950,0,20);
    		UserIF sub2User = new User("Abonné2","prénom2","login","mdp",sub2Date,"prenom@abonné2.fr");
    		Date topDate = new Date(1971,6,7);
    		UserIF topUser = new User("Topic1","prénom3","login","mdp",topDate,"prenom@topic1.fr");
    		Receiver receiver1 = new Receiver(topUser,sub1User);
    		Receiver receiver2 = new Receiver(topUser,sub2User);
    		receiver1.receiveMessage();
    		receiver2.receiveMessage();
    	}
     
    }

    Merci d'avance pour votre aide.

  2. #2
    Membre régulier
    Homme Profil pro
    Expert Middleware
    Inscrit en
    Septembre 2006
    Messages
    75
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Nord (Nord Pas de Calais)

    Informations professionnelles :
    Activité : Expert Middleware

    Informations forums :
    Inscription : Septembre 2006
    Messages : 75
    Points : 100
    Points
    100
    Par défaut
    Si tu as un durable suscribers et qu'il s'est bien connecté une fois, tu devrais avoir tes messages dans ton topic.

    As-tu bien configuré une persistance des messages ? Tu peux aussi vérifier que ton durable suscribers soit bien enregistré et qu'il n'est pas été "désaboné" automatiquement. Tu peux modifier des options dans ce sens dans activeMQ : http://activemq.apache.org/manage-du...bscribers.html
    Quand on est mort, on ne sait pas qu’on est mort, c’est pour les autres que c’est difficile… Quand on est con, c’est pareil.

  3. #3
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut
    Bonjour Syberi@

    tout d'abord merci d'avoir pris le temps de te pencher sur mon problème.

    Je viens de refaire une batterie de tests consistant en une déconnexion de mes "durable subscribers", l'envoi de plusieurs messages dans le topic puis une reconnexion.
    Lors de la reconnexion, je récupère bien les messages envoyés "en absence", ce qui est le fonctionnement nominal souhaité.

    Donc ce qui me perturbe, c'est que sans rien changer entre temps c'est de nouveau OK.
    Du coup, je me pose la question de savoir comment, quand des messages sont "enqueued" pas mais "dequeued", faire un nouvel envoi vers les subscribers afin que ceux-ci puissent être lus.
    -> comment lister les message "enqueued" mais pas "dequeued" ?

    Quand tu me parles de la persistence tu m'interpelles. Je pensais que le fait d'avoir des "durable subscribers" sur un topic impliquait la mise en oeuvre de la persistence par ActiveMQ.
    Y a-t-il autre chose à faire en matière de persistence ?

    Comme c'est mon premier projet JMS, je m'y perds un peu ^^

    Pendant que j'y suis, je suis en train de regarder un moyen de supprimer des topics. Il semblerait qu'il soit impératif qu'il n'y ait plus de subscribers, ce qui parait normal.
    Sais-tu où je pourrais trouver ce genre d'infos : pour un topic donné, lister tous ses subscribers, les désabonner puis virer le topic ?

    Merci encore.

  4. #4
    Membre régulier
    Homme Profil pro
    Expert Middleware
    Inscrit en
    Septembre 2006
    Messages
    75
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Nord (Nord Pas de Calais)

    Informations professionnelles :
    Activité : Expert Middleware

    Informations forums :
    Inscription : Septembre 2006
    Messages : 75
    Points : 100
    Points
    100
    Par défaut
    Je n'ai pas assez travaillé sur activeMq pour te répondre précisément mais d'aprés ce que dis cet url : http://activemq.apache.org/how-can-i...-a-broker.html, trois choix s'offre à toi : le dev, l'interface web ou le jmx (je prendrais cette dernière personnelement si j'avais à faire un choix).

    Pour la persistance, sauf si activeMQ est bien différent de ce que je connais mais je ne pense pas, tu peux avoir des topics non persistant. On parle ici de message qui serait perdu au redémarrage de ton activeMQ (Ecriture du message sur disque et non en mémoire).

    J'ai un peu de littérature sur activeMQ qui m'attend depuis un moment, je jète un oeil voir si je peux t'en dire plus.
    Quand on est mort, on ne sait pas qu’on est mort, c’est pour les autres que c’est difficile… Quand on est con, c’est pareil.

  5. #5
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut
    Génial, merci pour ton aide.

    J'ai un peu avancé de mon côté pour ce qui est de "Unsubscribe" d'un Topic, mais cela implique que le Subscriber en question n'ait pas de session active.
    Pour tester, j'ai créé une classe "AdminJMS" (j'ai besoin que cela passe par du code, je ne peux pas le faire depuis la console d'AMQ):

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
     
    /**
     * 
     */
    package com.citizenweb.classes;
     
    import java.util.Date;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.ObjectMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
     
    import com.citizenweb.interfaces.PostIF;
    import com.citizenweb.interfaces.UserIF;
     
    /**
     * @author Fred
     *
     */
    public class AdminJMS {
     
    	private static ActiveMQConnectionFactory factory = null;
    	private static ActiveMQConnection connection = null;
    	private static ActiveMQSession session = null;
    	private Topic destination = null;
    	private MessageConsumer consumer = null;
     
    	static UserIF userTopic = new User();
    	static UserIF userSubscriber = new User();
    	static String nomAbonnement = "";
     
    	public AdminJMS() {
    	}
     
    	public static void connect() {
    		try {
    			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
    			factory.setTrustAllPackages(true);
    			connection = (ActiveMQConnection) factory.createConnection();
    			connection.setClientID(userSubscriber.toString());
    			connection.start();
    			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public static void unsubscribe(UserIF topic, UserIF subscriber) {
    		userTopic = topic;
    		userSubscriber = subscriber;
    		nomAbonnement = topic.toString() + "->" + subscriber.toString();
    		if (session == null) {
    			connect();
    		}
    		try {
    			session.unsubscribe(nomAbonnement);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	/**
             * @param args
             */
    	public static void main(String[] args) {
    		Date sub1Date = new Date(1950, 0, 20);
    		UserIF sub1User = new User("Abonné1", "prénom1", "login", "mdp", sub1Date, "prenom@abonné1.fr");
    		Date sub2Date = new Date(1950, 0, 20);
    		UserIF sub2User = new User("Abonné2", "prénom2", "login", "mdp", sub2Date, "prenom@abonné2.fr");
    		Date topDate = new Date(1971, 6, 7);
    		UserIF topUser = new User("Topic1", "prénom3", "login", "mdp", topDate, "prenom@topic1.fr");
    		unsubscribe(topUser,sub2User);
    	}
     
    }
    Je trouve un peu "lourd" qu'on ne puisse pas se désinscrire en indiquant le Topic lui-même mais le nom qu'on lui a donné, mais bon, de cette façon ça passe.
    Le seul hic, comme dit précédemment, c'est que cela ne fonctionne que lorsque l'abonné n'est pas en "écoute active", donc quand il est déconnecté.

    Certes cela parait logique. Toutefois, comme cette désinscription est à l'origine de l'abonné, cela génère des erreurs.
    Il faudrait donc que je trouve un moyen de supprimer le MessageListener afin que l'abonné ne soit pas "à l'écoute" au moment où il cherche à se désinscrire d'un topic.

    J'ai pas mal cherché sur Google, mais je ne trouve rien qui parle de supprimer un Listener. Etonnant...

  6. #6
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut
    Ils sont géniaux sur StackOverFlow : tu cherches des réponses pendant des plombes, et quand tu t'apprêtes à poser une question, ils te sortent un post de derrière les fagots qui répond exactement au problème ^^

    Bon, j'ai bigrement progressé pour le "Unsubscribe" à l'initiative du DurableSubscriber.
    Comme vu sur SOF, il suffit de faire un consumer.close() avant de pouvoir "unsubscribe".

    Du coup ça donne :

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
     
    public void unsuscribe(UserIF topic, UserIF subscriber){
    		try {
    			consumer.close();
    			nomAbonnement = topic.toString() + "->" + subscriber.toString();
    			session.unsubscribe(nomAbonnement);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
     
    	}
    Encore une fois, je trouve dommage de ne pas pouvoir utiliser le nom de Topic lui-même mais tant que ça fonctionne !!

    Donc, au bout du compte, ça ne répond pas exactement à la question initiale qui était "comment supprimer un MessageListener ?", mais ça permet d'arriver à l'objectif qui est de se désinscrire d'un Topic alors qu'on est connecté.

  7. #7
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut
    Je fais le bilan de mes recherches et de ma progression :

    - mon abonnement est tombé en marche
    - je peux "unsubscribe" un DurableSubscriber d'un Topic

    Il me reste donc à comprendre comment faire pour récupérer la liste des notifs qui, pour on ne sait quelle raison, pourraient rester bloquées et ne pas être acheminées aux destinataires (ma question initiale).
    -> pour un Subscriber donné à un Topic donné, faire le compte des messages "enqueued" mais pas "dequeued"

  8. #8
    Membre régulier
    Homme Profil pro
    Expert Middleware
    Inscrit en
    Septembre 2006
    Messages
    75
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Nord (Nord Pas de Calais)

    Informations professionnelles :
    Activité : Expert Middleware

    Informations forums :
    Inscription : Septembre 2006
    Messages : 75
    Points : 100
    Points
    100
    Par défaut
    Alors j'ai fait quelque recherches de mon coté et installé un activeMQ.

    Via l'interface tu peux visualiser pas mal d'information : http://localhost:8161/admin. même si c'est incomplet pour moi j'y ai trouvé les suscribers, enqueue, dequeue ect ...

    Si la question est pour le monitoring, tu pourrais peut-être trouver ton bonheur ici : http://activemq.apache.org/how-can-i...-activemq.html
    Quand on est mort, on ne sait pas qu’on est mort, c’est pour les autres que c’est difficile… Quand on est con, c’est pareil.

  9. #9
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut
    Bonjour à tous,

    je reviens à la charge sur ce sujet qui se complique en fait.

    J'avais réussi à désinscrire des subscribers mais n'avais dans mes test que 2 subscribers et 2 topics.

    A présent, avec un 3ème subscriber, plus possible de désinscrire qui que ce soit.

    Chaque fois j'ai la même erreur : javax.jms.JMSException: Durable consumer is in use

    Pourtant ça n'a pas l'air compliqué :

    J'ai un Topic "X" auquel sont abonnés plusieurs DurableSubscribers.
    L'un des subscribers veut interrompre son abonnement.

    A savoir : c'est un projet de réseau social : les Topics sont en fait des utilisateurs. A chaque fois qu'ils font quelque chose, une notif est envoyée à leurs amis.

    Quand 2 personnes sont "amies", cela signifie qu'elles sont mutuellement abonnées aux notifs générées par l'autre personne. Elles peuvent rester "amies" en se désabonnant des notifications afin de ne pas se sentir spammées. Voilà à quoi doit servir la méthode "unsubscribe".

    Merci à tous ceux qui pourront m'éclaircir.

    Versions plus récentes des classes :

    SENDER :

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
     
    package com.citizenweb.classes;
     
    import java.util.Date;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageFormatException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.ObjectMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
     
    import com.citizenweb.interfaces.PostIF;
    import com.citizenweb.interfaces.UserIF;
     
    public class Sender {
     
    	private ActiveMQConnectionFactory factory = null;
    	private ActiveMQConnection connection = null;
    	private ActiveMQSession session = null;
    	private Destination destination = null;
    	private MessageProducer producer = null;
     
    	public Sender() {
    	}
     
    	public void connect(){
    		try{
    			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
    			factory.setTrustAllPackages(true);
    			connection = (ActiveMQConnection) factory.createConnection();
    			connection.start();
    			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		} catch (JMSException e){
    			e.printStackTrace();
    		}
    	}
     
    	public void sendPost(UserIF user,PostIF post) {
    		if(session==null){connect();}
    		try {
    			destination = session.createTopic(user.toString());
    			producer = session.createProducer(destination);
    			ObjectMessage postMessage = session.createObjectMessage();
    			postMessage.setObject(post);
    			producer.send(postMessage);
    			System.out.println("\n SENDER Object message sent");
     
    		} catch (MessageFormatException e) {
    			e.printStackTrace();
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public void sendInformation(UserIF user,String info){
    		if(session==null){connect();}
    		try {
    			destination = session.createTopic(user.toString());
    			producer = session.createProducer(destination);
    			TextMessage infoMessage = session.createTextMessage();
    			infoMessage.setText(info);
    			producer.send(infoMessage);
    			System.out.println("\n SENDER Information message sent");
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	/**
             * @param args
             * @throws Exception
             */
    	public static void main(String[] args) throws Exception {
     
    		UserIF u1, u2, u3;
    		String[] nom = new String[5];
    		String[] prenom = new String[5];
    		String[] login = new String[5];
    		String[] password = new String[5];
    		Date[] naiss = new Date[5];
    		String[] mail = new String[5];
    		for (int i = 0; i < 5; i++) {
    			nom[i] = "nom_" + i;
    			prenom[i] = "prenom_" + i;
    			login[i] = "login_" + i;
    			password[i] = "password_" + i;
    			naiss[i] = new Date();
    			mail[i] = "mail_" + i;
    		}
     
    		System.out.println("\n SENDER AFFECTATION DES NOMS");
    		u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
    		u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
    		u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
     
     
    		Sender sender = new Sender();
     
    		sender.sendInformation(u1, "U1 notification");
    		sender.sendInformation(u2, "U2 notification");
    		sender.sendInformation(u3, "U3 notification");
    		//PostIF post = new Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User) u1).getIdUser(),0);
    		//sender.sendPost(user, post);
    		sender.session.close();
    		sender.connection.close();
     
    	}
     
    }

    RECEIVER :

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
     
    package com.citizenweb.classes;
     
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.broker.region.Destination;
    import com.citizenweb.interfaces.PostIF;
    import com.citizenweb.interfaces.UserIF;
    import com.citizenweb.classes.Post;
     
    public class Receiver implements MessageListener, Serializable {
     
    	private static final long serialVersionUID = 1L;
    	private ActiveMQConnectionFactory factory = null;
    	private ActiveMQConnection connection = null;
    	private ActiveMQSession session = null;
    	private Topic destination = null;
    	private MessageConsumer consumer = null;
     
    	UserIF userTopic = new User();
    	UserIF userSubscriber = new User();
    	List<Message> listeMsg = new ArrayList<Message>();
     
    	public Receiver(UserIF subscriber) {
    		this.userSubscriber = subscriber;
    	}
     
    	public void connect() {
    		try {
    			factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    			// TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production
    			factory.setTrustAllPackages(true);
    			connection = (ActiveMQConnection) factory.createConnection();
    			// ClientID :
    			// https://qnalist.com/questions/2068823/create-durable-topic-subscriber
    			connection.setClientID(userSubscriber.toString());
    			connection.start();
    			session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public void receiveMessage(UserIF topic) {
    		try {
    			if (session == null) {
    				connect();
    			}
    			destination = session.createTopic(topic.toString());
    			String nomAbonnement = topic.toString() + "->" + userSubscriber.toString();
    			//String nomAbonnement = userSubscriber.toString();
    			consumer = session.createDurableSubscriber(destination, nomAbonnement);
    			consumer.setMessageListener(this);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public void unsubscribe(UserIF topic) {
    		try {
    			if (session == null) {
    				connect();
    			}
    			System.out.println("\n RECEIVER Désinscription du topic " + topic.toString());
    			//consumer.close();
    			String nomAbonnement = topic.toString() + "->" + userSubscriber.toString();
    			//String nomAbonnement = userSubscriber.toString();
    			System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement);
    			consumer.close();
    			session.unsubscribe(nomAbonnement);
    			System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	@Override
    	public void onMessage(Message message) {
    		System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString());
    		listeMsg.add(message);
    		System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size());
    		String classe = message.getClass().getSimpleName();
    		System.out.println("\n RECEIVER Classe de message : " + classe);
    		try {
    			if (message instanceof TextMessage) {
    				TextMessage text = (TextMessage) message;
    				System.out.println("\n RECEIVER Information : " + text.getText());
    			}
    			if (message instanceof ObjectMessage) {
    				System.out.println("\n RECEIVER ObjectMessage");
    				ObjectMessage oMessage = (ObjectMessage) message;
    				if (oMessage.getObject() instanceof PostIF) {
    					PostIF post = (PostIF) oMessage.getObject();
    					String s = ((Post) post).getCorpsMessage();
    					System.out.println("\n RECEIVER Post : " + s);
    				}
    			}
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
     
    	public static void main(String[] args) throws JMSException {
     
    		/*
    		 * EACH USER IS A TOPIC FOR OTHER USERS
    		 * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS
    		*/
     
    		//CREATE USER
    		UserIF u1, u2, u3;
    		String[] nom = new String[5];
    		String[] prenom = new String[5];
    		String[] login = new String[5];
    		String[] password = new String[5];
    		Date[] naiss = new Date[5];
    		String[] mail = new String[5];
    		for (int i = 0; i < 5; i++) {
    			nom[i] = "nom_" + i;
    			prenom[i] = "prenom_" + i;
    			login[i] = "login_" + i;
    			password[i] = "password_" + i;
    			naiss[i] = new Date();
    			mail[i] = "mail_" + i;
    		}
     
    		u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
    		u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
    		u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
     
    		/*
    		 * MAKE EACH USER A SUBSCRIBER
    		 */
    		Receiver receiver1 = new Receiver(u1);
    		Receiver receiver2 = new Receiver(u2);
    		Receiver receiver3 = new Receiver(u3);
     
    		/*
    		 * PUT A MESSAGE LISTENER FOR EACH USER
    		 */
    		receiver1.receiveMessage(u2);
    		receiver1.receiveMessage(u3);
    		receiver2.receiveMessage(u1);
    		receiver2.receiveMessage(u3);
    		receiver3.receiveMessage(u1);
    		receiver3.receiveMessage(u2);
     
    		/*
    		 * CALL THE SENDER CLASS TO SEND MESSAGES
    		 */
    		try {
    			Sender.main(args);
    		} catch (Exception e1) {
    			e1.printStackTrace();
    		}
     
    		/*
    		 * A SLEEP TO HAVE ENOUGH TIME TO LOOK AT THE ACTIVEMQ CONSOLE
    		 * CAN BE REMOVE
    		 */
    		try {
    			Thread.sleep(10000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    			return;
    		}
     
    		/*
    		 * UNSUBSCRIBE SUBSCRIBERS FROM TOPICS
    		 */
    		receiver1.unsubscribe(u2);
    		receiver1.unsubscribe(u3);
    		receiver2.unsubscribe(u1);
    		receiver2.unsubscribe(u3);
    		receiver3.unsubscribe(u1);
    		receiver3.unsubscribe(u2);
    	}
     
    }

  10. #10
    Membre du Club Avatar de Lovegiver
    Homme Profil pro
    Développeur Java
    Inscrit en
    Août 2015
    Messages
    81
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 52
    Localisation : France, Essonne (Île de France)

    Informations professionnelles :
    Activité : Développeur Java
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Août 2015
    Messages : 81
    Points : 57
    Points
    57
    Par défaut
    Hello,

    I've got the solution and its explanation.

    Each connection needs <i>a unique ClientID</i> : <b>connection.setClientID("clientID");</b>
    My mistake was to understand this unicity for a given client.

    When a client subscribes to a Topic, there is one connection for this Topic. So, for a given client subscribed to 3 topics (for instance), 3 ClientID are needed because 3 connections are needed.
    A ClientID has to be unique because it identifies one connection of one client for one topic.

    That's why I had so many JMSExceptions.

    Thanx to all of you who gave me time and support.

+ Répondre à la discussion
Cette discussion est résolue.

Discussions similaires

  1. [2014] Afficher des données pendant une période, mais une valeur par jour
    Par Tymer dans le forum Développement
    Réponses: 3
    Dernier message: 22/04/2015, 15h41
  2. sélectionner des évènement que pendant une période temps donnée
    Par jenniferIUP dans le forum Requêtes et SQL.
    Réponses: 2
    Dernier message: 17/02/2009, 14h05
  3. DotMsn : Envoyer et recevoir des messages
    Par Couz02 dans le forum Framework .NET
    Réponses: 19
    Dernier message: 03/06/2008, 19h46
  4. Transmettre des messages udp via une page web
    Par DrMaboul dans le forum Général Conception Web
    Réponses: 0
    Dernier message: 02/04/2008, 21h33
  5. [Debutant] envoi d'un message POST avec une URL
    Par Yannos94 dans le forum Entrée/Sortie
    Réponses: 11
    Dernier message: 14/06/2006, 10h22

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo