Précédent   Forum des professionnels en informatique > Java > Général Java > APIs > IO
IO Vos questions sur les flux d'entrées/sorties, fichiers, traitements de données, protocoles réseau, ...
Partagez cette discussion sur d'autres réseaux sociaux : Viadeo Twitter Google Facebook Digg Delicious MySpace Yahoo
Réponse Proposer ce sujet en actualité
 
Outils de la discussion
Publicité
'
Vieux 09/02/2012, 11h28   #1
Membre du Club
 
Inscription : février 2007
Messages : 103
Détails du profil
Informations personnelles :
Âge : 41

Informations forums :
Inscription : février 2007
Messages : 103
Points : 48
Points : 48
Par défaut SocketChannel : Problème d'écriture le cache n'est pas envoyé lorsque la socket n'est pas stimulée

Bonjour.

J'ai une problème avec les sockets de type SocketChannel et NIO en mode non bloquant.
Ce code permet de sérialiser binairement des classes avec une sérialisation perso (je sais que java a ses propres systèmes de sérialisation mais la question n'est pas là) dont le but est de s'échanger des classes entre C++ (QT) et Java (C++ <-> C++ et C++ <-> Java).
Le code créé fonctionne presque parfaitement.
Sauf lorsque je crée artificiellement 100000 envoie d'élément sur la socket et que le récepteur ne peut pas tout lire aussi rapidement (le principe est de tester que si l'on envoie plus vite que le récepteur ne peut lire, rien n'est perdu). Je tiens à préciser que la partie C++ -> C++ fonctionne parfaitement.

Je vous donne une partie du code source qui ne fonctionne pas...
Code :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
Ici c'est le code writeToSocketde l'instance fSendMessagesManager (instance d'une inner class)
	    /**
	     * write data from socket, serialize all messages.
	     *
	     * @return true if the socket is full and some elements need to be send later, false if no more element to send. 
	     * @throws IOException
	     */
	    public boolean writeToSocket(SelectionKey aKey) throws IOException
	    {
	    	int iNbMessagesToSend;
            do
            {
    	    	synchronized(m_lstMessagesToSend)
    	    	{	// Protect concurent access to m_lstMessagesToSend
    	    		iNbMessagesToSend = m_lstMessagesToSend.size();
	                if (flLengthMessage == 0 && iNbMessagesToSend>0)
	                {
	                	ISerializationObject iMessage;
	                    // Remove first element of the list
	                	Iterator<ISerializationObject> it = m_lstMessagesToSend.iterator();
	                	iMessage = it.next();
	                	it.remove();
	                	--iNbMessagesToSend;
	                	
	                    // Serialize this message and add if after previous one if exists
	                    int iCurrentPosition = fReadWriteBuffer.position(); 	// Record current position
	                    fReadWriteBuffer.position(iCurrentPosition + 4); 		// Left space for message size
	                    SerializationArchive ar = new SerializationArchive(SerializationArchive.SensArchive.Storing,fReadWriteBuffer);
	                    ar.WriteObject(iMessage); // Serialize message

	                    flLengthMessage = fReadWriteBuffer.position() - iCurrentPosition;
	                    fReadWriteBuffer.putInt(iCurrentPosition,(int) flLengthMessage - 4); // Write message size before the serialized element
	                    fReadWriteBuffer.flip();
	            	}
    	    	}
                if (flLengthMessage > 0)
                {   // Something to write
                	SocketChannel socketChannel = (SocketChannel) aKey.channel();
                	// Write message content
                	int iNbBytesWrites  = socketChannel.write(fReadWriteBuffer);
                	flLengthMessage -= iNbBytesWrites;
					if (flLengthMessage == 0)
					{	// If all is send, compact and prepare to send next
						fReadWriteBuffer.compact(); // Suppress this message, ready to reading next one
					}
					else
					{
						int h= 0;
						h++;
					}
                }
            }   // Continuer tant qu'il y a de la place et qu'il y a des messages en attente dans la file d'attente
            while (flLengthMessage==0 && iNbMessagesToSend>0);

            return flLengthMessage!=0 || iNbMessagesToSend>0;
	    }
FIN de Ici c'est le code writeToSocketde l'instance fSendMessagesManager (instance d'une inner class)

	/**
	 * Write data from socket, serialize if needed.
	 *
	 * Write to socket next content to send.
	 *
	 * @param aKey Key to manage.
	 * @throws IOException 
	 */
	private void writeToSocket(SelectionKey aKey) throws IOException
	{
		if (fSendMessagesManager.writeToSocket(aKey))
		{	// More elements to write
			aKey.interestOps(aKey.interestOps() | SelectionKey.OP_WRITE);
		}
		else
		{	// No more elements to write
			aKey.interestOps(aKey.interestOps() & ~SelectionKey.OP_WRITE);
		}
	}


	private void processSelectionKey(SelectionKey selKey) throws Exception
	{
		if (!selKey.isValid())
		{	// Disconnected socket
			throw new IOException();
		}
		else
		{
			// Since the ready operations are cumulative,
			// need to check readiness for each operation
			if (selKey.isConnectable())
			{
				// Get channel with connection request
				SocketChannel sChannel = (SocketChannel) selKey.channel();
				
				try
				{
					if (sChannel.finishConnect())
					{
						selKey.interestOps(selKey.interestOps() & ~SelectionKey.OP_CONNECT);					
						selKey.interestOps(selKey.interestOps() | SelectionKey.OP_READ);
						SwingUtilities.invokeLater(new Runnable()
						{
							@Override
							public void run()
							{
								onSocketConnect();
							}
							
						});
					}
				}
				catch(IOException ex)
				{ 	// An error occurred : cannot connect : notify disconnection
					throw new SocketDisconnectedException();
				}
			}
			if (selKey.isReadable())
			{
				readFromSocketAndNotify(selKey);
			}
			if (selKey.isWritable())
			{
				writeToSocket(selKey);
			}
		}
	}


	/**
	* Initialize the socket.
	*
	* Used when socket is accepted, to give socket to managers.
	* Made automatically on client side on QTSOCKET_CONNECTION event,
	* must be done manually on server side after the accept command.
	*
	* @param _pSocket Pointer on socket. Should be valid.
	* @param _bDeleteWhenDisconnect Indicate if the socket should be deleted after disconnect.
	*/
	public void init(SocketChannel aSocket)
	{
		clear();
		try
		{
			configureChannel(aSocket);
			fSelector = Selector.open();
			aSocket.register(fSelector, SelectionKey.OP_CONNECT);

			Thread thread = new Thread(new Runnable()
			{
				@Override public void run()
				{
					SelectionKey selKey = null;
					try
					{
						while (true)
						{
							// Wait for an event
							fSelector.select();
							
							// Get list of selection keys with pending events
							@SuppressWarnings("rawtypes")
							Iterator it = fSelector.selectedKeys().iterator();
	
							// Process each key at a time
							while (it.hasNext())
							{
								// Get the selection key
								selKey = (SelectionKey) it.next();
	
								// Remove it from the list to indicate that it is being
								// processed 
								it.remove();
	
								// If is connected or will be connected
								processSelectionKey(selKey);
								selKey = null;
							}
						}
					}
					catch(SocketDisconnectedException e)
					{
						if (selKey != null)
						{	// Handle error with channel and unregister
							selKey.cancel();
						}
						try
						{
							fTCPSocket.close();
						}
						catch (IOException ex)
						{	// Don't care if exception is raised
						}
						SwingUtilities.invokeLater(new Runnable()
						{
							@Override
							public void run()
							{
								onSocketDisconnect();
							}
							
						});
					}
					catch (Exception e)
					{
						if (selKey != null)
						{	// Handle error with channel and unregister
							selKey.cancel();
						}
					}
					finally
					{
						clear();
					}
				}
			}); // Wait for events
			fTCPSocket = aSocket;
			thread.start(); // Start the thread
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}
En gros :
-> un thread créé dans init(SocketChannel aSocket) qui est en attente des différents evènements sockets appel processSelectionKey(selKey); sur chaque évènement.
-> Une fonction processSelectionKey(selKey) qui aiguille sur chaque fonction en fonction du type de notification.
-> Une fonction writeToSocket qui appelle l'instance fSendMessagesManager.writeToSocket(aKey) et qui, en fonction du retour set ou unset le SelectionKey.OP_WRITE pour être notifée lorsque de la place est disponible en sortie de la socket ou pas (en fonction de s'il a encore des éléments à envoyer ou pas).
-> la fonction writeToSocket de mon inner class qui contient une liste d'élément à sérialiser. S'il en reste un et que le buffer interne est à 0 (tout a été envoyé), il dépile le message à envoyer, le sérialise (SerializationArchive) puis l'envoie sur la socket. Si, en sortie du write sur la socket tout n'est pas envoyé la fonction quitte et le OP_WRITE sera mis à true, sinon, tant qu'il reste des éléments à envoyer et que la socket absorbe tout (le write retourne la taille buffer qu'il a a écrire indiquant qu'il a tout envoyé) il continue à dépiler et envoyer les éléments tant qu'il n'en reste plus.

Ceci fonctionne.... presque...

En fait, tant que j'envoie (100 000 éléments en quelques secondes), le programme C++ reçoit correctement. Lorsque le programme Java a tout envoyé, le programme C++ en est à 2500 environ.
A ce moment là, OP_WRITE est mis à 0 et la socket côté java est en attente ... normal ... sauf que le programme C++ arrête de recevoir !!! Et ça c'est pas normal !
Au début j'avais pensé que je perdais des informations... mais en fait non...
Si j'envoie régulièrement (toute les secondes par exemple) un buffer d'un octet à 0 alors la socket se déploque et le programme C++ lit le message 2501,2502,2503,etc... (qui fait 150 octets, donc le programme C++ peut lire 150 octets pour un octet à 0 écrit dans la socket côté Java) comme s'il fallait stimuler continuellement la socket pour qu'elle envoie encore ses données mises en cache...

La socket est configurée comme suit :
Code :
1
2
3
4
5
6
7
8
9
10
11
	protected void configureChannel(SocketChannel aChannel) throws IOException
	{	
		aChannel.configureBlocking(false);
		aChannel.socket().setSendBufferSize(0x10000);		// 64Ko
		aChannel.socket().setReceiveBufferSize(0x10000);	// 64Ko
		aChannel.socket().setKeepAlive(true);
		aChannel.socket().setReuseAddress(true);
		aChannel.socket().setSoLinger(false, 0);
		aChannel.socket().setSoTimeout(0);
		aChannel.socket().setTcpNoDelay(true);
	}
Feneck91 est déconnecté   Envoyer un message privé Réponse avec citation 00
Vieux 09/02/2012, 12h16   #2
Membre du Club
 
Inscription : février 2007
Messages : 103
Détails du profil
Informations personnelles :
Âge : 41

Informations forums :
Inscription : février 2007
Messages : 103
Points : 48
Points : 48
Bon, le fait de mettre ses idées au clair aide parfois...
J'ai trouvé la solution. Et cela venait du programme C++ sous QT.
En effet, à chaque notification de type READ je lisais un et un seul message alors que la socket devait en contenir plein.
Je pensais recevoir une autre notificationde la part de la socket plus tard car je n'avais pas tout lu (a priori c'est ce qui est fait sous wxWidgets car ce code C++ sous QT a été porté d'une lib existante sous wxWidgets avec laquelle je n'avais pas de problème de ce type (faudra que je reteste)), mais en fait non, je ne reçois plus de notification sauf en cas de nouvelles choses à lire...
Du coup, après correction, il s'avère que l'envoi prend beaucoup plus de temps (ce qui parait bien plus logique) et les deux programmes sont bien plus synchrones.
Le problème maintenant, c'est que si je reçoit trop de messages, je suis bloqué dans la lecture et je ne passe plus dans la boucle de messages...
A tester.
Désolé pour le dérangement.
Feneck91 est déconnecté   Envoyer un message privé Réponse avec citation 00
Réponse Proposer ce sujet en actualité Cette discussion est résolue.
Outils de la discussion



Fuseau horaire GMT +2. Il est actuellement 02h03.


 
 
 
 
Partenaires

Hébergement Web