SocketChannel : Problème d'écriture le cache n'est pas envoyé lorsque la socket n'est pas stimulée
Bonjour.
J'ai une problème avec les sockets de type SocketChannel et NIO en mode non bloquant.
Ce code permet de sérialiser binairement des classes avec une sérialisation perso (je sais que java a ses propres systèmes de sérialisation mais la question n'est pas là) dont le but est de s'échanger des classes entre C++ (QT) et Java (C++ <-> C++ et C++ <-> Java).
Le code créé fonctionne presque parfaitement.
Sauf lorsque je crée artificiellement 100000 envoie d'élément sur la socket et que le récepteur ne peut pas tout lire aussi rapidement (le principe est de tester que si l'on envoie plus vite que le récepteur ne peut lire, rien n'est perdu). Je tiens à préciser que la partie C++ -> C++ fonctionne parfaitement.
Je vous donne une partie du code source qui ne fonctionne pas...
Code:

|
Ici c'est le code writeToSocketde l'instance fSendMessagesManager (instance d'une inner class)
/**
* write data from socket, serialize all messages.
*
* @return true if the socket is full and some elements need to be send later, false if no more element to send.
* @throws IOException
*/
public boolean writeToSocket(SelectionKey aKey) throws IOException
{
int iNbMessagesToSend;
do
{
synchronized(m_lstMessagesToSend)
{ // Protect concurent access to m_lstMessagesToSend
iNbMessagesToSend = m_lstMessagesToSend.size();
if (flLengthMessage == 0 && iNbMessagesToSend>0)
{
ISerializationObject iMessage;
// Remove first element of the list
Iterator<ISerializationObject> it = m_lstMessagesToSend.iterator();
iMessage = it.next();
it.remove();
--iNbMessagesToSend;
// Serialize this message and add if after previous one if exists
int iCurrentPosition = fReadWriteBuffer.position(); // Record current position
fReadWriteBuffer.position(iCurrentPosition + 4); // Left space for message size
SerializationArchive ar = new SerializationArchive(SerializationArchive.SensArchive.Storing,fReadWriteBuffer);
ar.WriteObject(iMessage); // Serialize message
flLengthMessage = fReadWriteBuffer.position() - iCurrentPosition;
fReadWriteBuffer.putInt(iCurrentPosition,(int) flLengthMessage - 4); // Write message size before the serialized element
fReadWriteBuffer.flip();
}
}
if (flLengthMessage > 0)
{ // Something to write
SocketChannel socketChannel = (SocketChannel) aKey.channel();
// Write message content
int iNbBytesWrites = socketChannel.write(fReadWriteBuffer);
flLengthMessage -= iNbBytesWrites;
if (flLengthMessage == 0)
{ // If all is send, compact and prepare to send next
fReadWriteBuffer.compact(); // Suppress this message, ready to reading next one
}
else
{
int h= 0;
h++;
}
}
} // Continuer tant qu'il y a de la place et qu'il y a des messages en attente dans la file d'attente
while (flLengthMessage==0 && iNbMessagesToSend>0);
return flLengthMessage!=0 || iNbMessagesToSend>0;
}
FIN de Ici c'est le code writeToSocketde l'instance fSendMessagesManager (instance d'une inner class)
/**
* Write data from socket, serialize if needed.
*
* Write to socket next content to send.
*
* @param aKey Key to manage.
* @throws IOException
*/
private void writeToSocket(SelectionKey aKey) throws IOException
{
if (fSendMessagesManager.writeToSocket(aKey))
{ // More elements to write
aKey.interestOps(aKey.interestOps() | SelectionKey.OP_WRITE);
}
else
{ // No more elements to write
aKey.interestOps(aKey.interestOps() & ~SelectionKey.OP_WRITE);
}
}
private void processSelectionKey(SelectionKey selKey) throws Exception
{
if (!selKey.isValid())
{ // Disconnected socket
throw new IOException();
}
else
{
// Since the ready operations are cumulative,
// need to check readiness for each operation
if (selKey.isConnectable())
{
// Get channel with connection request
SocketChannel sChannel = (SocketChannel) selKey.channel();
try
{
if (sChannel.finishConnect())
{
selKey.interestOps(selKey.interestOps() & ~SelectionKey.OP_CONNECT);
selKey.interestOps(selKey.interestOps() | SelectionKey.OP_READ);
SwingUtilities.invokeLater(new Runnable()
{
@Override
public void run()
{
onSocketConnect();
}
});
}
}
catch(IOException ex)
{ // An error occurred : cannot connect : notify disconnection
throw new SocketDisconnectedException();
}
}
if (selKey.isReadable())
{
readFromSocketAndNotify(selKey);
}
if (selKey.isWritable())
{
writeToSocket(selKey);
}
}
}
/**
* Initialize the socket.
*
* Used when socket is accepted, to give socket to managers.
* Made automatically on client side on QTSOCKET_CONNECTION event,
* must be done manually on server side after the accept command.
*
* @param _pSocket Pointer on socket. Should be valid.
* @param _bDeleteWhenDisconnect Indicate if the socket should be deleted after disconnect.
*/
public void init(SocketChannel aSocket)
{
clear();
try
{
configureChannel(aSocket);
fSelector = Selector.open();
aSocket.register(fSelector, SelectionKey.OP_CONNECT);
Thread thread = new Thread(new Runnable()
{
@Override public void run()
{
SelectionKey selKey = null;
try
{
while (true)
{
// Wait for an event
fSelector.select();
// Get list of selection keys with pending events
@SuppressWarnings("rawtypes")
Iterator it = fSelector.selectedKeys().iterator();
// Process each key at a time
while (it.hasNext())
{
// Get the selection key
selKey = (SelectionKey) it.next();
// Remove it from the list to indicate that it is being
// processed
it.remove();
// If is connected or will be connected
processSelectionKey(selKey);
selKey = null;
}
}
}
catch(SocketDisconnectedException e)
{
if (selKey != null)
{ // Handle error with channel and unregister
selKey.cancel();
}
try
{
fTCPSocket.close();
}
catch (IOException ex)
{ // Don't care if exception is raised
}
SwingUtilities.invokeLater(new Runnable()
{
@Override
public void run()
{
onSocketDisconnect();
}
});
}
catch (Exception e)
{
if (selKey != null)
{ // Handle error with channel and unregister
selKey.cancel();
}
}
finally
{
clear();
}
}
}); // Wait for events
fTCPSocket = aSocket;
thread.start(); // Start the thread
}
catch (IOException e)
{
e.printStackTrace();
}
} |
En gros :
-> un thread créé dans init(SocketChannel aSocket) qui est en attente des différents evènements sockets appel processSelectionKey(selKey); sur chaque évènement.
-> Une fonction processSelectionKey(selKey) qui aiguille sur chaque fonction en fonction du type de notification.
-> Une fonction writeToSocket qui appelle l'instance fSendMessagesManager.writeToSocket(aKey) et qui, en fonction du retour set ou unset le SelectionKey.OP_WRITE pour être notifée lorsque de la place est disponible en sortie de la socket ou pas (en fonction de s'il a encore des éléments à envoyer ou pas).
-> la fonction writeToSocket de mon inner class qui contient une liste d'élément à sérialiser. S'il en reste un et que le buffer interne est à 0 (tout a été envoyé), il dépile le message à envoyer, le sérialise (SerializationArchive) puis l'envoie sur la socket. Si, en sortie du write sur la socket tout n'est pas envoyé la fonction quitte et le OP_WRITE sera mis à true, sinon, tant qu'il reste des éléments à envoyer et que la socket absorbe tout (le write retourne la taille buffer qu'il a a écrire indiquant qu'il a tout envoyé) il continue à dépiler et envoyer les éléments tant qu'il n'en reste plus.
Ceci fonctionne.... presque...
En fait, tant que j'envoie (100 000 éléments en quelques secondes), le programme C++ reçoit correctement. Lorsque le programme Java a tout envoyé, le programme C++ en est à 2500 environ.
A ce moment là, OP_WRITE est mis à 0 et la socket côté java est en attente ... normal ... sauf que le programme C++ arrête de recevoir !!! Et ça c'est pas normal !
Au début j'avais pensé que je perdais des informations... mais en fait non...
Si j'envoie régulièrement (toute les secondes par exemple) un buffer d'un octet à 0 alors la socket se déploque et le programme C++ lit le message 2501,2502,2503,etc... (qui fait 150 octets, donc le programme C++ peut lire 150 octets pour un octet à 0 écrit dans la socket côté Java) comme s'il fallait stimuler continuellement la socket pour qu'elle envoie encore ses données mises en cache...
La socket est configurée comme suit :
Code:
1 2 3 4 5 6 7 8 9 10 11
| protected void configureChannel(SocketChannel aChannel) throws IOException
{
aChannel.configureBlocking(false);
aChannel.socket().setSendBufferSize(0x10000); // 64Ko
aChannel.socket().setReceiveBufferSize(0x10000); // 64Ko
aChannel.socket().setKeepAlive(true);
aChannel.socket().setReuseAddress(true);
aChannel.socket().setSoLinger(false, 0);
aChannel.socket().setSoTimeout(0);
aChannel.socket().setTcpNoDelay(true);
} |