Bonjour tout le monde,

j'ai un problème concernant l'utilisation des threads et de la synchronisation. Je dois implémenter en JAVA le fonctionnement de TCP en utilisant des Datagrams UDP. Pour cela j'ai besoin de gérer un buffer dans lequel je dois pouvoir lire et écrire des données simultanément. Pour cela, j'utilise plusieurs threads différents qui accédent aux mêmes variables mais je ne vois pas comment synchroniser ces variables. (J'ai regarder comment utiliser le mot clé synchronized mais dans mon cas je ne vois pas comment l'utiliser)

Les variables buffer, timer, lastSequenceNumberAcked, lastSequenceNumberSend, lastSequenceNumberWritten, stop doivent être synchronisés afin que les threads n'y accédent pas simultanément

Ci-dessous le code du client qui charge un fichier dans un buffer (un thread), qui envoit les packets au serveur (un autre thread) et qui attend les acks (un autre thread). Ces threads accédent aux variables ci dessus.

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import java.io.*;
import java.net.*;
import java.lang.*;
import java.util.*;
 
public class Client
{
 public static int port;
 public static InetAddress server;
 public static ClientBuffer buffer;
 public static FileInputStream dataFileInputStream;
 public static DatagramSocket socket;
 public static long timer;
 public static String retransmitPolicy;
 
 public static int lastSequenceNumberAcked = -1;
 public static int lastSequenceNumberSend = -1;
 public static int lastSequenceNumberWritten = -1;
 public static boolean stop = false;
 
public static DatagramPacket buildUDPPacket(int indexInBuffer,int sequenceNumber,int ackNumber){
	ClientBufferItem item = buffer.getClientBufferItemAtIndex(indexInBuffer);
	item.getPacket().setSequenceNumber(sequenceNumber);
	item.getPacket().setAckNumber(ackNumber);
	byte[] data = Tools.toBytes(item.getPacket());
	return new DatagramPacket(data,data.length,server,port);
}
 
 private static class SendingDataThread extends Thread{
	public void run(){
		timer = new java.util.Date().getTime();
		while(lastSequenceNumberAcked < lastSequenceNumberWritten || !stop){
			synchronized(buffer){
			long rto = new java.util.Date().getTime() - timer;
			//System.out.println("rto " + rto);
			if(retransmitPolicy.equals("F") && rto > 60){
				int itemIndexToResend = (buffer.lastItemAcked+1)%buffer.windowSize;
				try{
					socket.send(buildUDPPacket(itemIndexToResend,lastSequenceNumberAcked+1,0));
					System.out.println("Packet " + (lastSequenceNumberAcked+1) + " is RESENT");
				}catch(Exception e){e.printStackTrace();}
				timer = new java.util.Date().getTime();
 
			}
			if(retransmitPolicy.equals("B") && rto > 60){
				if(buffer.lastItemSend != buffer.lastItemAcked){
					if(buffer.lastItemSend > buffer.lastItemAcked)
						lastSequenceNumberSend -= (buffer.lastItemSend-(buffer.lastItemAcked));
					else
						lastSequenceNumberSend -= ((buffer.windowSize-(buffer.lastItemAcked+1))+ (buffer.lastItemSend));
					buffer.lastItemSend = (buffer.lastItemAcked+1)%buffer.windowSize;
				}
			}
			int itemIndexToSend = (buffer.lastItemSend+1)%buffer.windowSize;
			//Sytem.out.println("lastSequenceNumberSend " + lastSequenceNumberSend + " lastSequenceNumberAcked " + lastSequenceNumberAcked + " buffer.windowSize " + buffer.windowSize + " stop " + stop);
			if((lastSequenceNumberSend < lastSequenceNumberWritten  && ((lastSequenceNumberSend - lastSequenceNumberAcked) < buffer.windowSize)))
			{
				try{
					socket.send(buildUDPPacket(itemIndexToSend,lastSequenceNumberSend+1,0));	
				}catch(Exception e){e.printStackTrace();}
				timer = new java.util.Date().getTime();
				buffer.lastItemSend=itemIndexToSend;
				lastSequenceNumberSend++;
				System.out.println("Packet " + lastSequenceNumberSend + " is sent");
			}
			}
		}
 
    }
 }
 
 private static class ReceivingAckThread extends Thread{
	public void run(){
		byte[] tmp = new byte[1024];
		DatagramPacket p = new DatagramPacket(tmp,1024);
		while(true){
			try{
				socket.receive(p);
				timer = new java.util.Date().getTime();
			}catch(Exception e){e.printStackTrace();}
			UTCPPacket utcpp = (UTCPPacket)(Tools.toObject(p.getData()));
			int itemIntexWaitForAcked = (buffer.lastItemAcked+1)%buffer.windowSize;
			if(utcpp.getAckNumber() == lastSequenceNumberAcked+2){
				buffer.lastItemAcked = itemIntexWaitForAcked;
				lastSequenceNumberAcked++;
				System.out.println("Packet " + lastSequenceNumberAcked + " is acknowledged");
			}
		}
    }
 }
  private static class LoadingFileThread extends Thread{
	public void run(){
		try{
			while(dataFileInputStream.available() > 0){
				int itemIndexToWrite = (buffer.lastItemWritten+1)%buffer.windowSize;
				if((buffer.lastItemAcked ==-1 && lastSequenceNumberWritten < buffer.windowSize) || (buffer.lastItemAcked !=-1 && itemIndexToWrite != buffer.lastItemAcked)){
					dataFileInputStream.read(buffer.getClientBufferItemAtIndex(itemIndexToWrite).getPacket().getData());
					buffer.lastItemWritten = itemIndexToWrite;
					lastSequenceNumberWritten++;
					System.out.println("Packet " + lastSequenceNumberWritten + " is written");
				}
			}
		}catch(Exception e){e.printStackTrace();}
	}
 }
 
 
 public static void main(String argv[]) throws Exception
 {
 	if(argv.length == 4){
		//server address
	 	server = InetAddress.getByName(argv[0]);
		//port
	 	port = Integer.parseInt(argv[1]);
	 	//dataFile
	 	String dataFilePath = argv[2];
	 	File dataFile = new File(dataFilePath);
	 	dataFileInputStream = new FileInputStream(dataFile);
		//build buffer
		buffer = new ClientBuffer();
		//retransmit policy
		retransmitPolicy = argv[3];
	 	//UDP Socket creation
	 	socket = new DatagramSocket();
		LoadingFileThread LFT = new LoadingFileThread();
		SendingDataThread SDT = new SendingDataThread();
		ReceivingAckThread RAT = new ReceivingAckThread();
		LFT.start();
		SDT.start();
		RAT.start();
		LFT.join();
		stop=true;
 	}
 	else{
 		System.err.println("arguments error -> java Client ServerName PortNumber DataFilePath RetransmitPolicy");
 	}
 
 }
}
Si quelqu'un peut m'aider je vous en serait très reconnaissant.

Merci d'avance.

JB