Bonjour.

J'ai une problème avec les sockets de type SocketChannel et NIO en mode non bloquant.
Ce code permet de sérialiser binairement des classes avec une sérialisation perso (je sais que java a ses propres systèmes de sérialisation mais la question n'est pas là) dont le but est de s'échanger des classes entre C++ (QT) et Java (C++ <-> C++ et C++ <-> Java).
Le code créé fonctionne presque parfaitement.
Sauf lorsque je crée artificiellement 100000 envoie d'élément sur la socket et que le récepteur ne peut pas tout lire aussi rapidement (le principe est de tester que si l'on envoie plus vite que le récepteur ne peut lire, rien n'est perdu). Je tiens à préciser que la partie C++ -> C++ fonctionne parfaitement.

Je vous donne une partie du code source qui ne fonctionne pas...
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
Ici c'est le code writeToSocketde l'instance fSendMessagesManager (instance d'une inner class)
	    /**
	     * write data from socket, serialize all messages.
	     *
	     * @return true if the socket is full and some elements need to be send later, false if no more element to send. 
	     * @throws IOException
	     */
	    public boolean writeToSocket(SelectionKey aKey) throws IOException
	    {
	    	int iNbMessagesToSend;
            do
            {
    	    	synchronized(m_lstMessagesToSend)
    	    	{	// Protect concurent access to m_lstMessagesToSend
    	    		iNbMessagesToSend = m_lstMessagesToSend.size();
	                if (flLengthMessage == 0 && iNbMessagesToSend>0)
	                {
	                	ISerializationObject iMessage;
	                    // Remove first element of the list
	                	Iterator<ISerializationObject> it = m_lstMessagesToSend.iterator();
	                	iMessage = it.next();
	                	it.remove();
	                	--iNbMessagesToSend;
	                	
	                    // Serialize this message and add if after previous one if exists
	                    int iCurrentPosition = fReadWriteBuffer.position(); 	// Record current position
	                    fReadWriteBuffer.position(iCurrentPosition + 4); 		// Left space for message size
	                    SerializationArchive ar = new SerializationArchive(SerializationArchive.SensArchive.Storing,fReadWriteBuffer);
	                    ar.WriteObject(iMessage); // Serialize message

	                    flLengthMessage = fReadWriteBuffer.position() - iCurrentPosition;
	                    fReadWriteBuffer.putInt(iCurrentPosition,(int) flLengthMessage - 4); // Write message size before the serialized element
	                    fReadWriteBuffer.flip();
	            	}
    	    	}
                if (flLengthMessage > 0)
                {   // Something to write
                	SocketChannel socketChannel = (SocketChannel) aKey.channel();
                	// Write message content
                	int iNbBytesWrites  = socketChannel.write(fReadWriteBuffer);
                	flLengthMessage -= iNbBytesWrites;
					if (flLengthMessage == 0)
					{	// If all is send, compact and prepare to send next
						fReadWriteBuffer.compact(); // Suppress this message, ready to reading next one
					}
					else
					{
						int h= 0;
						h++;
					}
                }
            }   // Continuer tant qu'il y a de la place et qu'il y a des messages en attente dans la file d'attente
            while (flLengthMessage==0 && iNbMessagesToSend>0);

            return flLengthMessage!=0 || iNbMessagesToSend>0;
	    }
FIN de Ici c'est le code writeToSocketde l'instance fSendMessagesManager (instance d'une inner class)

	/**
	 * Write data from socket, serialize if needed.
	 *
	 * Write to socket next content to send.
	 *
	 * @param aKey Key to manage.
	 * @throws IOException 
	 */
	private void writeToSocket(SelectionKey aKey) throws IOException
	{
		if (fSendMessagesManager.writeToSocket(aKey))
		{	// More elements to write
			aKey.interestOps(aKey.interestOps() | SelectionKey.OP_WRITE);
		}
		else
		{	// No more elements to write
			aKey.interestOps(aKey.interestOps() & ~SelectionKey.OP_WRITE);
		}
	}


	private void processSelectionKey(SelectionKey selKey) throws Exception
	{
		if (!selKey.isValid())
		{	// Disconnected socket
			throw new IOException();
		}
		else
		{
			// Since the ready operations are cumulative,
			// need to check readiness for each operation
			if (selKey.isConnectable())
			{
				// Get channel with connection request
				SocketChannel sChannel = (SocketChannel) selKey.channel();
				
				try
				{
					if (sChannel.finishConnect())
					{
						selKey.interestOps(selKey.interestOps() & ~SelectionKey.OP_CONNECT);					
						selKey.interestOps(selKey.interestOps() | SelectionKey.OP_READ);
						SwingUtilities.invokeLater(new Runnable()
						{
							@Override
							public void run()
							{
								onSocketConnect();
							}
							
						});
					}
				}
				catch(IOException ex)
				{ 	// An error occurred : cannot connect : notify disconnection
					throw new SocketDisconnectedException();
				}
			}
			if (selKey.isReadable())
			{
				readFromSocketAndNotify(selKey);
			}
			if (selKey.isWritable())
			{
				writeToSocket(selKey);
			}
		}
	}


	/**
	* Initialize the socket.
	*
	* Used when socket is accepted, to give socket to managers.
	* Made automatically on client side on QTSOCKET_CONNECTION event,
	* must be done manually on server side after the accept command.
	*
	* @param _pSocket Pointer on socket. Should be valid.
	* @param _bDeleteWhenDisconnect Indicate if the socket should be deleted after disconnect.
	*/
	public void init(SocketChannel aSocket)
	{
		clear();
		try
		{
			configureChannel(aSocket);
			fSelector = Selector.open();
			aSocket.register(fSelector, SelectionKey.OP_CONNECT);

			Thread thread = new Thread(new Runnable()
			{
				@Override public void run()
				{
					SelectionKey selKey = null;
					try
					{
						while (true)
						{
							// Wait for an event
							fSelector.select();
							
							// Get list of selection keys with pending events
							@SuppressWarnings("rawtypes")
							Iterator it = fSelector.selectedKeys().iterator();
	
							// Process each key at a time
							while (it.hasNext())
							{
								// Get the selection key
								selKey = (SelectionKey) it.next();
	
								// Remove it from the list to indicate that it is being
								// processed 
								it.remove();
	
								// If is connected or will be connected
								processSelectionKey(selKey);
								selKey = null;
							}
						}
					}
					catch(SocketDisconnectedException e)
					{
						if (selKey != null)
						{	// Handle error with channel and unregister
							selKey.cancel();
						}
						try
						{
							fTCPSocket.close();
						}
						catch (IOException ex)
						{	// Don't care if exception is raised
						}
						SwingUtilities.invokeLater(new Runnable()
						{
							@Override
							public void run()
							{
								onSocketDisconnect();
							}
							
						});
					}
					catch (Exception e)
					{
						if (selKey != null)
						{	// Handle error with channel and unregister
							selKey.cancel();
						}
					}
					finally
					{
						clear();
					}
				}
			}); // Wait for events
			fTCPSocket = aSocket;
			thread.start(); // Start the thread
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}
En gros :
-> un thread créé dans init(SocketChannel aSocket) qui est en attente des différents evènements sockets appel processSelectionKey(selKey); sur chaque évènement.
-> Une fonction processSelectionKey(selKey) qui aiguille sur chaque fonction en fonction du type de notification.
-> Une fonction writeToSocket qui appelle l'instance fSendMessagesManager.writeToSocket(aKey) et qui, en fonction du retour set ou unset le SelectionKey.OP_WRITE pour être notifée lorsque de la place est disponible en sortie de la socket ou pas (en fonction de s'il a encore des éléments à envoyer ou pas).
-> la fonction writeToSocket de mon inner class qui contient une liste d'élément à sérialiser. S'il en reste un et que le buffer interne est à 0 (tout a été envoyé), il dépile le message à envoyer, le sérialise (SerializationArchive) puis l'envoie sur la socket. Si, en sortie du write sur la socket tout n'est pas envoyé la fonction quitte et le OP_WRITE sera mis à true, sinon, tant qu'il reste des éléments à envoyer et que la socket absorbe tout (le write retourne la taille buffer qu'il a a écrire indiquant qu'il a tout envoyé) il continue à dépiler et envoyer les éléments tant qu'il n'en reste plus.

Ceci fonctionne.... presque...

En fait, tant que j'envoie (100 000 éléments en quelques secondes), le programme C++ reçoit correctement. Lorsque le programme Java a tout envoyé, le programme C++ en est à 2500 environ.
A ce moment là, OP_WRITE est mis à 0 et la socket côté java est en attente ... normal ... sauf que le programme C++ arrête de recevoir !!! Et ça c'est pas normal !
Au début j'avais pensé que je perdais des informations... mais en fait non...
Si j'envoie régulièrement (toute les secondes par exemple) un buffer d'un octet à 0 alors la socket se déploque et le programme C++ lit le message 2501,2502,2503,etc... (qui fait 150 octets, donc le programme C++ peut lire 150 octets pour un octet à 0 écrit dans la socket côté Java) comme s'il fallait stimuler continuellement la socket pour qu'elle envoie encore ses données mises en cache...

La socket est configurée comme suit :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
	protected void configureChannel(SocketChannel aChannel) throws IOException
	{	
		aChannel.configureBlocking(false);
		aChannel.socket().setSendBufferSize(0x10000);		// 64Ko
		aChannel.socket().setReceiveBufferSize(0x10000);	// 64Ko
		aChannel.socket().setKeepAlive(true);
		aChannel.socket().setReuseAddress(true);
		aChannel.socket().setSoLinger(false, 0);
		aChannel.socket().setSoTimeout(0);
		aChannel.socket().setTcpNoDelay(true);
	}