Bonjour,

J’ai un problème embêtant : en essayant de traiter un gros ensemble de données via du multi-thread (le but est de projeter un graphe géré par la librairie Networkx, la partie qui nous intéresse concerne l’ajout d’arêtes), j’obtiens un temps de calcul 3 fois supérieur à la durée sans utiliser de threads (180 sec contre 60 sec environ). Or je n’ai pas de message d’erreur, et les résultats sont ceux attendus. Pire, en affichant le numéro des threads durant l’exécution (la ligne #print numthread mise en commentaire), je vois que les threads s’exécutent bien simultanément (j’ai une suite du genre « 003102321020102… »).

Donc a priori tout fonctionne bien… Mais le temps de calcul est augmenté !


Certes il y a des variables globales dans la fonction appelée par les threads, mais elles sont toutes en lecture (je pense que c'est préférable pour les performances, car les passer en paramètre serait assez lourd). La variable globale dictthreads, qui contient la liste d'arêtes générée par chaque thread, est elle en écriture, mais chaque thread ne la modifiera qu'une seule fois au cours de son lancement et agira sur une clé différente du dictionnaire. Donc a priori pas de problème à ce niveau-là.

Je précise que je ne suis pas vraiment un expert en threading, donc peut-être qu’un join() n’est pas la méthode adaptée pour attendre que les threads terminent, même si ça me semble valable (j'ai quand même lu la FAQ sur la partie multi-thread, qui d'ailleurs ne parle pas de join() mais elle est franchement succincte). Je suis vraiment perdu.

La partie du code qui nous intéresse

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 
dictthreads = {}
 
 
def forthread(nb1,nb2,numthread):
    global product
    global prod_neighbors
    global cust_neighbors
    global product_graph
    global dictthreads
    listearretes = []
    for i in range(nb1,nb2):
        print numthread
        x=product[i]
        for y in prod_neighbors[x]:
            for z in cust_neighbors[y]:
                nbCommuns = len(set(prod_neighbors[x]) & set(prod_neighbors[z]));
                if nbCommuns > 0:
                    listearretes.append([(x,z,nbCommuns)])
                #product_graph.add_weighted_edges_from([(x,z,nbCommuns)]) #tenter de liste les edges pour chaqe thread et de construire le graphe ensuite
    dictthreads[numthread] = listearretes
 
print "lancement des threads"
listethreads = {}
for i in range(0,len(intervalles)):
	listethreads[i] = Thread(None, forthread, None, (intervalles[i][0],intervalles[i][1],i), {})
	listethreads[i].start()
print "fin du lancement des threads, au nombre de ", len(listethreads.keys())
 
for i in range(0,len(intervalles)):   
    listethreads[i].join()
    #listethreads[i]._Thread__stop()
    print "ajout d'un lot d'arretes numéro ", i+1, "en contenant ", len(dictthreads[i])
    for j in range(0,len(dictthreads[i])):
        product_graph.add_weighted_edges_from(dictthreads[i][j])
 
print "fin de l'ajout d'arrêtes"
Code complet :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
 
# -*-coding:Latin-1 -*
 
import networkx as nx
import random
import fonction_base as fb
import time
from threading import Thread
import time
 
temps1 = time.clock()
 
g=nx.readwrite.read_gpickle("testPartiel.graph")
 
cg = fb.comp_connexe(g)
 
#cg = nx.connected_component_subgraphs(g)[0] # /!\ on ne prend que la premiere composante connexe
product=list()
customers=list()
print "création des listes de neouds"
for n in cg.nodes():
    if type(n) == int:
        product.append(n)
    else:
        customers.append(n)
 
print "nombre de produits", len(product)
print "nombre de customers", len(customers)
product_graph=nx.Graph(name="product")
product_graph.add_nodes_from(product)
 
customers_graph=nx.Graph(name="customers")
customers_graph.add_nodes_from(customers)
 
print "construction des dicos"
# build a neighbors (products) dictionnary
prod_neighbors={}
for a in product:
    prod_neighbors[a]=cg.neighbors(a)
 
# build a neighbors (customers) dictionnary   
cust_neighbors={}
for c in customers:
    cust_neighbors[c]=cg.neighbors(c)
 
 
#product_indice=0
 
nbprod = len(product)
def diviseIntervalle(longueur, nbinter):
	intervalles = [] #liste de bornes
	pas = longueur/nbinter  #l=21, nb=4, pas=5
	intervalles.append([0,pas])#[0,5]
	for i in range(1,(longueur/pas)-1): #i=1 a 4
		intervalles.append([i*(pas)+1,(i+1)*(pas)]) #[6,10] [11,15] [16,20]
	intervalles.append([((longueur/pas)-1)*(pas)+1,longueur-1])
	#intervalles.append([(longueur/pas)*pas+1,longueur-1])
	return intervalles
intervalles = diviseIntervalle(nbprod,5)
print "nombre d'intervalles", len(intervalles)
 
dictthreads = {}
 
 
def forthread(nb1,nb2,numthread):
    global product
    global prod_neighbors
    global cust_neighbors
    global product_graph
    global dictthreads
    listearretes = []
    for i in range(nb1,nb2):
        print numthread
        x=product[i]
        for y in prod_neighbors[x]:
            for z in cust_neighbors[y]:
                nbCommuns = len(set(prod_neighbors[x]) & set(prod_neighbors[z]));
                if nbCommuns > 0:
                    listearretes.append([(x,z,nbCommuns)])
                #product_graph.add_weighted_edges_from([(x,z,nbCommuns)]) #tenter de liste les edges pour chaqe thread et de construire le graphe ensuite
    dictthreads[numthread] = listearretes
 
print "lancement des threads"
listethreads = {}
for i in range(0,len(intervalles)):
	listethreads[i] = Thread(None, forthread, None, (intervalles[i][0],intervalles[i][1],i), {})
	listethreads[i].start()
print "fin du lancement des threads, au nombre de ", len(listethreads.keys())
 
for i in range(0,len(intervalles)):   
    listethreads[i].join()
    #listethreads[i]._Thread__stop()
    print "ajout d'un lot d'arretes numéro ", i+1, "en contenant ", len(dictthreads[i])
    for j in range(0,len(dictthreads[i])):
        product_graph.add_weighted_edges_from(dictthreads[i][j])
 
print "fin de l'ajout d'arrêtes"
 
 
 
#print "construction cust"
#for x in customers:
    #print "plop ",product_indice
    #product_indice=product_indice+1
    #if len(prod_neighbors[x]) > 1:
     #   add_clique(customers_graph,prod_neighbors[x])
    #for y in cust_neighbors[x]:
        #for z in prod_neighbors[y]:
	    #nbCommuns = len(set(cust_neighbors[x]) & set(cust_neighbors[z]));
	    #if nbCommuns > 0:
		#customers_graph.add_weighted_edges_from([(x,y,nbCommuns)])
#rq: construire custumer prend 5 fois plus de temps à nb_prod= 5k
#complexité supposée : n^3
 
#Pour 1K : 2s
#pour 2.5k : 5s
#pour 5k : 20s
#pour 10k : 64s
 
 
subgraphprod = fb.comp_connexe(product_graph)
 
nx.readwrite.write_gpickle(subgraphprod,"productPartiel.graph")
 
nx.readwrite.write_gpickle(customers_graph,"customers.graph")
 
 
 
 
 
print "Le nombre de liens est de ",len(subgraphprod.edges()) ## liens
print "Le nombre de noeuds est de ",len(subgraphprod.nodes()) ## noeuds
print "La densité est de ", nx.density(subgraphprod)	#densite
#print "Le diamètre est de ", nx.diameter(subgraph)	#diametre
 
#print product_graph.nodes()
#print product_graph.edges(data=True)
 
duree = time.clock() - temps1
print "Durée : ", duree
Je vous remercie.