Bonjour à tous,

j'ai implémenté une version des producteurs / consommateur fonctionnelle.
Mais à présent je voudrais faire la version ou les producteurs alternent entre deux types de messages.

Mais ce que j'ai fais ne semble pas être bon (bien qu'algorithmiquement sur le papier je ne vois pas le soucis).

En espérant que vous puissiez m'aider.

Je ne vois pas trop comment isoler le problème par conséquent je met l'ensemble du code pour la version 1 sans alternance de façon à ne pouvoir mettre que les modifications dans la version alternée sur laquelle je coince.


Version sans alternance:

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
 * Producteur-consommateur, base sans synchronisation
 * 
 * Compilation : gcc m1_ProdConso_base.c -lpthread [-DOptionDeTrace ...] -o prodconso
 *
 * Options de trace lors de la compilation (-D) : 
 * TRACE_BUF : tracer le contenu du buffer
 * TRACE_THD : tracer la creation des threads
 * TRACE_SOUHAIT : tracer ce que veulent faire les producteurs/consommateurs
 * */
 
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
 
#define NB_FOIS_PROD   2 //10 
#define NB_FOIS_CONSO  2 //10 
 
#define NB_PROD_MAX   20
#define NB_CONSO_MAX  20
 
#define NB_CASES_MAX  20 
 
typedef struct {
  char info[80];
  int  type;          // Message de 2 types (0/1 par exemple)
  int  rangProd;      // Qui a produit le message
} TypeMessage;
 
typedef struct {
  TypeMessage buffer[NB_CASES_MAX];  // Buffer
  int iDepot;                        // Indice prochain depot
  int iRetrait;			     // Indice prochain retrait
  int nb_pleine;
} RessourceCritique;                 // A completer eventuellement pour la synchro
 
// Variables partagees entre tous
RessourceCritique resCritiques; // Modifications donc conflits possibles
int nbCases;                    // Taille effective du buffer, 
                                // Pas de modif donc pas de conflit
typedef struct {                // Parametre des threads
  int rang;                     // - rang de creation
  int typeMsg;                  // - type de message a deposer/retirer (si besoin)
  int dep;
  int ret;
} Parametres;
 
/*Condition exo 1*/
pthread_cond_t autoDeposer = PTHREAD_COND_INITIALIZER;
pthread_cond_t autoRetirer = PTHREAD_COND_INITIALIZER;
/*Mutex des conditions exos 1*/
pthread_mutex_t emMoniteur = PTHREAD_MUTEX_INITIALIZER;//init à 1
 
 
 
 
/*---------------------------------------------------------------------*/
/* codeErr : code retournee par une primitive
 * msgErr  : message d'erreur personnalise
 * valErr  : valeur retournee par le thread
 */
void thdErreur(int codeErr, char *msgErr, int valeurErr) {
  int *retour = malloc(sizeof(int));
  *retour = valeurErr;
  fprintf(stderr, "%s: %d soit %s \n", msgErr, codeErr, strerror(codeErr));
  pthread_exit(retour);
}
 
/*--------------------------------------------------*/
void initialiserVarPartagees (void) {
  int i;
 
  /* Le buffer, les indices et le nombre de cases pleines */
  resCritiques.iDepot = 0;
  resCritiques.iRetrait = 0;
  resCritiques.nb_pleine=0;
  for (i = 0; i < nbCases; i++) {
    strcpy(resCritiques.buffer[i].info, "Message vide");
    resCritiques.buffer[i].type = 0;
    resCritiques.buffer[i].rangProd = -1;
  }
}
 
/*--------------------------------------------------*/
void afficherBuffer (void) {
  int i;
 
  printf("[ ");
  for (i = 0; i < nbCases; i++) {
    printf("[T%d] %s (de %d), ", 
            resCritiques.buffer[i].type,
            resCritiques.buffer[i].info, 
            resCritiques.buffer[i].rangProd);
  }
  printf("]\n");
}
 
/*--------------------------------------------------*/
void depot (TypeMessage leMessage) {
  strcpy(resCritiques.buffer[resCritiques.iDepot].info, leMessage.info);
  resCritiques.buffer[resCritiques.iDepot].type = leMessage.type;
  resCritiques.buffer[resCritiques.iDepot].rangProd = leMessage.rangProd;
  resCritiques.iDepot = (resCritiques.iDepot + 1) % nbCases;
#ifdef TRACE_BUF
  afficherBuffer();
#endif
}
 
/*--------------------------------------------------*/
void retrait (TypeMessage *leMessage) {
  strcpy(leMessage->info, resCritiques.buffer[resCritiques.iRetrait].info);
  leMessage->type = resCritiques.buffer[resCritiques.iRetrait].type;
  leMessage->rangProd = resCritiques.buffer[resCritiques.iRetrait].rangProd;
  resCritiques.iRetrait = (resCritiques.iRetrait + 1) % nbCases;
#ifdef TRACE_BUF
  afficherBuffer();
#endif
}
 
/*--------------------------------------------------
 * Correspondra au service du moniteur vu en TD
 * La synchronisation sera ajoutee dans cette operation
 * */
void deposer (TypeMessage leMessage, int rangProd) {
  while(resCritiques.nb_pleine==nbCases){
    pthread_cond_wait(&autoDeposer,&emMoniteur);
  }
  depot(leMessage);
  printf("\tProd %d : Message a ete depose = [T%d] %s (de %d)\n", 
         rangProd, leMessage.type, leMessage.info, leMessage.rangProd);
  resCritiques.nb_pleine++;
  pthread_cond_signal(&autoRetirer);
 
}  
 
/*--------------------------------------------------
 * Correspondra au service du moniteur vu en TD
 * La synchronisation sera ajoutee dans cette operation
 * */
void retirer (TypeMessage *unMessage, int rangConso) {
  while(resCritiques.nb_pleine==0){
    pthread_cond_wait(&autoRetirer,&emMoniteur);
  }
  retrait(unMessage);
  printf("\t\tConso %d : Message a ete lu = [T%d] %s (de %d)\n", 
         rangConso, unMessage->type, unMessage->info, unMessage->rangProd);
  resCritiques.nb_pleine--;
  pthread_cond_signal(&autoDeposer);
}  
 
/*--------------------------------------------------*/
void * producteur (void *arg) {
  int i;
  TypeMessage leMessage;
  Parametres param = *(Parametres *)arg;
 
  srand(pthread_self());
 
  //** Q1 : NB_FOIS_PROD a remplacer par le nouveau parametre du main
  for (i = 0; i < param.dep; i++) {
    sprintf (leMessage.info, "%s %d %s %d", "bonjour num ", i, "de prod ", param.rang);
    leMessage.type = param.typeMsg;
    leMessage.rangProd = param.rang;
 
#ifdef TRACE_SOUHAIT
    printf("\t\tProd %d : Je veux deposer = [T%d] %s (de %d)\n", 
         param.rang, leMessage.type, leMessage.info, leMessage.rangProd);
#endif
    pthread_mutex_lock (&emMoniteur);
    deposer(leMessage, param.rang);
    pthread_mutex_unlock (&emMoniteur);
    //usleep(rand()%(100 * param.rang + 100));
  }
  pthread_exit(NULL);
}
 
/*--------------------------------------------------*/
void * consommateur (void *arg) {
  int i;
  TypeMessage unMessage;
  Parametres *param = (Parametres *)arg;
 
  srand(pthread_self());
 
  //** Q1 : NB_FOIS_CONSO a remplacer par le nouveau parametre du main
  for (i = 0; i < param->ret; i++) {
 
#ifdef TRACE_SOUHAIT
    printf("\t\tConso %d : Je veux retirer un message \n", param->rang);
#endif
    pthread_mutex_lock (&emMoniteur);
    retirer(&unMessage, param->rang);
    pthread_mutex_unlock (&emMoniteur);
 
    //usleep(rand()%(100 * param->rang + 100));
  }
  pthread_exit(NULL);
}
 
/*--------------------------------------------------*/
int main(int argc, char *argv[]) {
  int i, etat;
  int nbThds, nbProd, nbConso;
  int nbDep, nbRet;
  Parametres paramThds[NB_PROD_MAX + NB_CONSO_MAX];
  pthread_t idThdProd[NB_PROD_MAX], idThdConso[NB_CONSO_MAX];
 
  if (argc <= 5) {
    printf ("Usage: %s <Nb Prod <= %d> <Nb Conso <= %d> <Nb Cases <= %d>, <Nb Dépot> <Nb Retrait>, \n", 
             argv[0], NB_PROD_MAX, NB_CONSO_MAX, NB_CASES_MAX);
    exit(2);
  }
 
  nbProd  = atoi(argv[1]);
  if (nbProd > NB_PROD_MAX)
    nbProd = NB_PROD_MAX;
  nbConso = atoi(argv[2]);
  if (nbConso > NB_CONSO_MAX)
    nbConso = NB_CONSO_MAX;
  nbThds = nbProd + nbConso;
  nbCases = atoi(argv[3]);
  if (nbCases > NB_CASES_MAX)
    nbCases = NB_CASES_MAX;
  nbDep=atoi(argv[4]);
  nbRet=atoi(argv[5]);
  // Q1 : ajouter 2 parametres :
  // -  nombre de depots a faire par un producteur
  // -  nombre de retraits a faire par un consommateur
 
  initialiserVarPartagees();
 
  /* Creation des nbProd producteurs et nbConso consommateurs */
  for (i = 0; i <  nbThds; i++) {
    if (i < nbProd) {
      paramThds[i].typeMsg = i%2;
      paramThds[i].rang = i;
      paramThds[i].dep = nbDep;
      if ((etat = pthread_create(&idThdProd[i], NULL, producteur, &paramThds[i])) != 0)
        thdErreur(etat, "Creation producteur", etat);
#ifdef TRACE_THD
      printf("Creation thread prod %lu de rang %d -> %d/%d\n", idThdProd[i], i, paramThds[i].rang, nbProd);
#endif
    }
    else {
      paramThds[i].typeMsg = i%2;
      paramThds[i].rang = i - nbProd;
      paramThds[i].ret = nbRet;
      if ((etat = pthread_create(&idThdConso[i-nbProd], NULL, consommateur, &paramThds[i])) != 0)
        thdErreur(etat, "Creation consommateur", etat);
#ifdef TRACE_THD
      printf("Creation thread conso %lu de rang %d -> %d/%d\n", idThdConso[i-nbProd], i, paramThds[i].rang, nbConso);
#endif
    }
  }
 
  /* Attente de la fin des threads */
  for (i = 0; i < nbProd; i++) {
    if ((etat = pthread_join(idThdProd[i], NULL)) != 0)
      thdErreur(etat, "Join threads producteurs", etat);
#ifdef TRACE_THD
    printf("Fin thread producteur de rang %d\n", i);
#endif
  }
 
  for (i = 0; i < nbConso; i++) {
    if ((etat = pthread_join(idThdConso[i], NULL)) != 0) 
      thdErreur(etat, "Join threads consommateurs", etat);
#ifdef TRACE_THD
    printf("Fin thread consommateur de rang %d\n", i);
#endif
  }
 
#ifdef TRACE_THD
  printf ("\nFin de l'execution du main \n");
#endif
 
  return 0;
}

Version avec alternance

Niveau variable dans mes ressources critiques j'ai ajouté un attribut dernier_type pour stocker le type du dernier dépôt effectué

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
typedef struct {
  TypeMessage buffer[NB_CASES_MAX];  // Buffer
  int iDepot;                        // Indice prochain depot
  int iRetrait;			     // Indice prochain retrait
  int nb_pleine;
  int dernier_type;
} RessourceCritique;
Là ou dans la première version j'avais une seule condition pour déposer; j'en ai désormais 2 (une pour chaque type).
Notez que je ne suis pas sûr de mon initialisation ici, j'ai peur que ce que je fasse en réalité soit de donner un "jeton" aux deux conditions dès le début alors que je devrais seulement en donner un.
Code : Sélectionner tout - Visualiser dans une fenêtre à part
pthread_cond_t autoDeposer[2] = { PTHREAD_COND_INITIALIZER };
Dans la fonction d'initialisation je rajoute l'alternance de type (0 ou 1)

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void initialiserVarPartagees (void) {
  int i;
  int type_message;
  /* Le buffer, les indices et le nombre de cases pleines */
  resCritiques.iDepot = 0;
  resCritiques.iRetrait = 0;
  resCritiques.nb_pleine=0;
  resCritiques.dernier_type=0;
  for (i = 0; i < nbCases; i++) {
    type_message=(i%2);
    resCritiques.buffer[i].type = type_message;
    resCritiques.buffer[i].info=strcat("Message vide de type ",(char *)(type_message));
    resCritiques.buffer[i].rangProd = -1;
  }
}

Lorsque je dépose, on a en plus du buffer plein la condition de blocage si le type du message est le même type que le dernier type.
On débloque toujours les consommateurs mais en plus on débloque les producteurs du type opposé.

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void deposer (TypeMessage leMessage, int rangProd) {
  while((resCritiques.nb_pleine==nbCases) || 
        (leMessage.type == resCritiques.dernier_type)){
          pthread_cond_wait(&autoDeposer[leMessage.type],&emMoniteur);
  }
  depot(leMessage);
  resCritiques.dernier_type=leMessage.type;
  printf("\tProd %d : Message a ete depose = [T%d] %s (de %d)\n", 
         rangProd, leMessage.type, leMessage.info, leMessage.rangProd);
  resCritiques.nb_pleine++;
  pthread_cond_signal(&autoRetirer);
  pthread_cond_signal(&autoDeposer[(resCritiques.dernier_type+1)%2]);
 
}
En ce qui concerne le retrait: on réveille un producteur du type différent du dernier dépot.

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
void retirer (TypeMessage *unMessage, int rangConso) {
  while(resCritiques.nb_pleine==0){
    pthread_cond_wait(&autoRetirer,&emMoniteur);
  }
  retrait(unMessage);
  printf("\t\tConso %d : Message a ete lu = [T%d] %s (de %d)\n", 
         rangConso, unMessage->type, unMessage->info, unMessage->rangProd);
  resCritiques.nb_pleine--;
  pthread_cond_signal(&autoDeposer[(resCritiques.dernier_type+1)%2]);
}

Bref, l'algo me semble ok; mais lorsque je fais la trace je n'ai pas de résultat cohérent avec ce que j'attends.


Merci pour votre aide !