IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

Python Discussion :

Le multi-thread n’est plus ce qu’il était


Sujet :

Python

  1. #1
    Membre régulier
    Homme Profil pro
    Étudiant
    Inscrit en
    Février 2013
    Messages
    7
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France

    Informations professionnelles :
    Activité : Étudiant
    Secteur : Conseil

    Informations forums :
    Inscription : Février 2013
    Messages : 7
    Par défaut Le multi-thread n’est plus ce qu’il était
    Bonjour,

    J’ai un problème embêtant : en essayant de traiter un gros ensemble de données via du multi-thread (le but est de projeter un graphe géré par la librairie Networkx, la partie qui nous intéresse concerne l’ajout d’arêtes), j’obtiens un temps de calcul 3 fois supérieur à la durée sans utiliser de threads (180 sec contre 60 sec environ). Or je n’ai pas de message d’erreur, et les résultats sont ceux attendus. Pire, en affichant le numéro des threads durant l’exécution (la ligne #print numthread mise en commentaire), je vois que les threads s’exécutent bien simultanément (j’ai une suite du genre « 003102321020102… »).

    Donc a priori tout fonctionne bien… Mais le temps de calcul est augmenté !


    Certes il y a des variables globales dans la fonction appelée par les threads, mais elles sont toutes en lecture (je pense que c'est préférable pour les performances, car les passer en paramètre serait assez lourd). La variable globale dictthreads, qui contient la liste d'arêtes générée par chaque thread, est elle en écriture, mais chaque thread ne la modifiera qu'une seule fois au cours de son lancement et agira sur une clé différente du dictionnaire. Donc a priori pas de problème à ce niveau-là.

    Je précise que je ne suis pas vraiment un expert en threading, donc peut-être qu’un join() n’est pas la méthode adaptée pour attendre que les threads terminent, même si ça me semble valable (j'ai quand même lu la FAQ sur la partie multi-thread, qui d'ailleurs ne parle pas de join() mais elle est franchement succincte). Je suis vraiment perdu.

    La partie du code qui nous intéresse

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
     
    dictthreads = {}
     
     
    def forthread(nb1,nb2,numthread):
        global product
        global prod_neighbors
        global cust_neighbors
        global product_graph
        global dictthreads
        listearretes = []
        for i in range(nb1,nb2):
            print numthread
            x=product[i]
            for y in prod_neighbors[x]:
                for z in cust_neighbors[y]:
                    nbCommuns = len(set(prod_neighbors[x]) & set(prod_neighbors[z]));
                    if nbCommuns > 0:
                        listearretes.append([(x,z,nbCommuns)])
                    #product_graph.add_weighted_edges_from([(x,z,nbCommuns)]) #tenter de liste les edges pour chaqe thread et de construire le graphe ensuite
        dictthreads[numthread] = listearretes
     
    print "lancement des threads"
    listethreads = {}
    for i in range(0,len(intervalles)):
    	listethreads[i] = Thread(None, forthread, None, (intervalles[i][0],intervalles[i][1],i), {})
    	listethreads[i].start()
    print "fin du lancement des threads, au nombre de ", len(listethreads.keys())
     
    for i in range(0,len(intervalles)):   
        listethreads[i].join()
        #listethreads[i]._Thread__stop()
        print "ajout d'un lot d'arretes numéro ", i+1, "en contenant ", len(dictthreads[i])
        for j in range(0,len(dictthreads[i])):
            product_graph.add_weighted_edges_from(dictthreads[i][j])
     
    print "fin de l'ajout d'arrêtes"
    Code complet :
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
     
    # -*-coding:Latin-1 -*
     
    import networkx as nx
    import random
    import fonction_base as fb
    import time
    from threading import Thread
    import time
     
    temps1 = time.clock()
     
    g=nx.readwrite.read_gpickle("testPartiel.graph")
     
    cg = fb.comp_connexe(g)
     
    #cg = nx.connected_component_subgraphs(g)[0] # /!\ on ne prend que la premiere composante connexe
    product=list()
    customers=list()
    print "création des listes de neouds"
    for n in cg.nodes():
        if type(n) == int:
            product.append(n)
        else:
            customers.append(n)
     
    print "nombre de produits", len(product)
    print "nombre de customers", len(customers)
    product_graph=nx.Graph(name="product")
    product_graph.add_nodes_from(product)
     
    customers_graph=nx.Graph(name="customers")
    customers_graph.add_nodes_from(customers)
     
    print "construction des dicos"
    # build a neighbors (products) dictionnary
    prod_neighbors={}
    for a in product:
        prod_neighbors[a]=cg.neighbors(a)
     
    # build a neighbors (customers) dictionnary   
    cust_neighbors={}
    for c in customers:
        cust_neighbors[c]=cg.neighbors(c)
     
     
    #product_indice=0
     
    nbprod = len(product)
    def diviseIntervalle(longueur, nbinter):
    	intervalles = [] #liste de bornes
    	pas = longueur/nbinter  #l=21, nb=4, pas=5
    	intervalles.append([0,pas])#[0,5]
    	for i in range(1,(longueur/pas)-1): #i=1 a 4
    		intervalles.append([i*(pas)+1,(i+1)*(pas)]) #[6,10] [11,15] [16,20]
    	intervalles.append([((longueur/pas)-1)*(pas)+1,longueur-1])
    	#intervalles.append([(longueur/pas)*pas+1,longueur-1])
    	return intervalles
    intervalles = diviseIntervalle(nbprod,5)
    print "nombre d'intervalles", len(intervalles)
     
    dictthreads = {}
     
     
    def forthread(nb1,nb2,numthread):
        global product
        global prod_neighbors
        global cust_neighbors
        global product_graph
        global dictthreads
        listearretes = []
        for i in range(nb1,nb2):
            print numthread
            x=product[i]
            for y in prod_neighbors[x]:
                for z in cust_neighbors[y]:
                    nbCommuns = len(set(prod_neighbors[x]) & set(prod_neighbors[z]));
                    if nbCommuns > 0:
                        listearretes.append([(x,z,nbCommuns)])
                    #product_graph.add_weighted_edges_from([(x,z,nbCommuns)]) #tenter de liste les edges pour chaqe thread et de construire le graphe ensuite
        dictthreads[numthread] = listearretes
     
    print "lancement des threads"
    listethreads = {}
    for i in range(0,len(intervalles)):
    	listethreads[i] = Thread(None, forthread, None, (intervalles[i][0],intervalles[i][1],i), {})
    	listethreads[i].start()
    print "fin du lancement des threads, au nombre de ", len(listethreads.keys())
     
    for i in range(0,len(intervalles)):   
        listethreads[i].join()
        #listethreads[i]._Thread__stop()
        print "ajout d'un lot d'arretes numéro ", i+1, "en contenant ", len(dictthreads[i])
        for j in range(0,len(dictthreads[i])):
            product_graph.add_weighted_edges_from(dictthreads[i][j])
     
    print "fin de l'ajout d'arrêtes"
     
     
     
    #print "construction cust"
    #for x in customers:
        #print "plop ",product_indice
        #product_indice=product_indice+1
        #if len(prod_neighbors[x]) > 1:
         #   add_clique(customers_graph,prod_neighbors[x])
        #for y in cust_neighbors[x]:
            #for z in prod_neighbors[y]:
    	    #nbCommuns = len(set(cust_neighbors[x]) & set(cust_neighbors[z]));
    	    #if nbCommuns > 0:
    		#customers_graph.add_weighted_edges_from([(x,y,nbCommuns)])
    #rq: construire custumer prend 5 fois plus de temps à nb_prod= 5k
    #complexité supposée : n^3
     
    #Pour 1K : 2s
    #pour 2.5k : 5s
    #pour 5k : 20s
    #pour 10k : 64s
     
     
    subgraphprod = fb.comp_connexe(product_graph)
     
    nx.readwrite.write_gpickle(subgraphprod,"productPartiel.graph")
     
    nx.readwrite.write_gpickle(customers_graph,"customers.graph")
     
     
     
     
     
    print "Le nombre de liens est de ",len(subgraphprod.edges()) ## liens
    print "Le nombre de noeuds est de ",len(subgraphprod.nodes()) ## noeuds
    print "La densité est de ", nx.density(subgraphprod)	#densite
    #print "Le diamètre est de ", nx.diameter(subgraph)	#diametre
     
    #print product_graph.nodes()
    #print product_graph.edges(data=True)
     
    duree = time.clock() - temps1
    print "Durée : ", duree
    Je vous remercie.

  2. #2
    Expert éminent
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 741
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 741
    Par défaut
    Salut,

    Avec du code "pure" Python, les threads peuvent être exécutées sur différents CPU mais attendent d'avoir acquis le global interlock (GIL) pour s'exécuter. Dans la pratique, la capacité CPU reste 1 alors qu'ajouter des threads c'est plus de code à exécuter. In fine, çà prend plus de temps.

    Le seul intérêt des threads est d'avoir un ordonnancement "préemptif": si un thread attend la fin d'une IO, une autre sera exécutée "automatiquement". En général, on préfèrera dans ce cas un ordonnancement coopératif (greenlet,...): c'est moins coûteux en CPU que le threading.

    Si vous voulez plus de CPU, il faut passer par multiprocessing ou par concurrent.futures avec des "workers" "multiprocess" plutôt que "multithread" et organiser les traitements pour limiter les échanges d'informations entre "process".

    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

  3. #3
    Membre régulier
    Homme Profil pro
    Étudiant
    Inscrit en
    Février 2013
    Messages
    7
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France

    Informations professionnelles :
    Activité : Étudiant
    Secteur : Conseil

    Informations forums :
    Inscription : Février 2013
    Messages : 7
    Par défaut
    Merci Wiztricks !

    J'ai testé le multiprocessing et effectivement les calculs sont accélérés ! Mais j'ai du coup un autre problème : chaque Process doit renvoyer une liste de noeuds que j'assemble et traite par la suite (les calculs consistent à choisir quels noeuds placer dans la liste)
    .
    Or le return n'a pas l'air de fonctionner avec les Process, et les variables globales ne peuvent pas être modifiées à l'intérieur d'un Process. Du coup, j'écris la liste de noeuds dans un fichier binaire mais l'écriture et la lecture du fichier prend tellement de temps que le multiprocessing n'est pas rentable...

    Y a-t-il un moyen efficace pour un Process de retourner les solutions qu'il a calculées ? J'ai essayé d'utiliser queue() mais le nombre de données très important ne semble pas non plus adapté à cette méthode.

    Merci beaucoup

  4. #4
    Expert éminent
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 741
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 741
    Par défaut
    Salut,

    Y a-t-il un moyen efficace pour un Process de retourner les solutions qu'il a calculées ? J'ai essayé d'utiliser queue() mais le nombre de données très important ne semble pas non plus adapté à cette méthode.
    Le soucis est qu'on peut avoir de la mémoire partagée assez simplement et l'utiliser pour partager des "ctypes". Pour partager des objets Python, le plus est de passer par un "manager" et la création d'objets et de "proxy".
    Il faut prendre le temps de lire la documentation de multiprocessing pour voir comment adapter cela à votre cas.

    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

Discussions similaires

  1. Tri multi-threadé
    Par Tifauv' dans le forum C
    Réponses: 8
    Dernier message: 28/06/2007, 09h00
  2. Réponses: 2
    Dernier message: 15/05/2004, 18h33
  3. Réponses: 16
    Dernier message: 30/01/2004, 11h05
  4. [VB6][active x] faire du multi-thread avec vb
    Par pecheur dans le forum VB 6 et antérieur
    Réponses: 9
    Dernier message: 20/05/2003, 12h01
  5. [Kylix] exception qtinft.dll et multi-threading
    Par leclaudio25 dans le forum EDI
    Réponses: 3
    Dernier message: 27/03/2003, 18h09

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo