IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

Python Discussion :

Stratégies multiprocessing: performances


Sujet :

Python

  1. #1
    Membre averti

    Homme Profil pro
    Développeur informatique
    Inscrit en
    Mars 2007
    Messages
    186
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 36
    Localisation : France, Alpes de Haute Provence (Provence Alpes Côte d'Azur)

    Informations professionnelles :
    Activité : Développeur informatique

    Informations forums :
    Inscription : Mars 2007
    Messages : 186
    Points : 399
    Points
    399
    Par défaut Stratégies multiprocessing: performances
    Bonjour,

    J'explore actuellement le multiprocessing en python3.3 et effectue un petit audit de performance entre deux stratégies. Voici le contexte:

    J'ai un calcul à effectuer sur X objets. L'idée est donc de découper cette liste de de X objets en différentes listes et de confier le calcul à différents processus.
    Dans le contexte qui m’intéresse ce calcul (sur les X objets) doit se répéter dans le temps. C'est à dire qu'après chaque "cycle", où l'on à effectué le calcul sur tous les objets, c'est repartis pour un tour. Et ce autant de fois que nécessaire.
    Jargon que je vais utiliser:
    - "X": Le nombre d'objets à traiter
    - "Y": Nombre de processus (égal au nombre de cœur de cpu tant qu'a faire)
    - "Chunk": Groupe d'objets confié a un processus
    - "Cycle": Les chunks ont été confiés aux processus, les processus on terminé leur calculs et ont tous retourné leur résultat

    Partant du principe que lors de la création d'un processus ont copie l'environnement avec lui puisque indépendant dans sont processus, j'ai envisagé deux stratégies possibles:
    A (recr) - A chaque cycle, récréer les Y processus avec les données a calculer (les Y processus mourant a chaque cycle après avoir fait leurs travail)
    B (keep) - Créer Y processus, leur donner le travail a faire chaque cycles sans les laisser mourir hormis au dernier cycle.

    Je me suis donc dit que la stratégie B serait toujours plus performante puisque les processus resteraient en vie. J'ai donc implémenter les deux méthodes de la manière suivante:

    processmanager.py
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    import sys
     
    if sys.version_info < (3, 3):
      sys.stdout.write("Python 3.3 required\n")
      sys.exit(1)
     
    from multiprocessing import Process, Pipe
    from multiprocessing.connection import wait
     
    def chunk(seq,m):
       i,j,x=len(seq),0,[]
       for k in range(m):
         a, j = j, j + (i+k)//m
         x.append(seq[a:j])
       return x
     
    class ProcessManager(object):
     
      def __init__(self, nb_process, target):
        self.target = target
        self.nb_process = nb_process
        self.readers_pipes = []
     
      def _start(self, chunked_things):
        self.readers_pipes = []
        for i in range(self.nb_process):
          local_read_pipe, local_write_pipe = Pipe(duplex=False)
          self.readers_pipes.append(local_read_pipe)
          p = Process(target=run_process, args=(self.target, local_write_pipe, chunked_things[i]))
          p.start()
          local_write_pipe.close()
     
      def get_their_work(self, things_to_do):
        chunked_things = chunk(things_to_do, self.nb_process)
        self._start(chunked_things)
        things_done_collection = []
        while self.readers_pipes:
          for r in wait(self.readers_pipes):
            try:
              things_dones = r.recv()
            except EOFError:
              self.readers_pipes.remove(r)
            else:
              self.readers_pipes.remove(r)
              things_done_collection.append(things_dones)
        return things_done_collection
     
     
    class KeepedAliveProcessManager(object):
     
      def __init__(self, nb_process, target):
        self.processs = []
        self.target = target
        self.nb_process = nb_process
        self.readers_pipes = []
        self.writers_pipes = []
     
      def _start(self, chunked_things):
        for i in range(self.nb_process):
          local_read_pipe, local_write_pipe = Pipe(duplex=False)
          process_read_pipe, process_write_pipe = Pipe(duplex=False)
          self.readers_pipes.append(local_read_pipe)
          self.writers_pipes.append(process_write_pipe)
          p = Process(target=run_keeped_process, args=(self.target, local_write_pipe, process_read_pipe, chunked_things[i]))
          p.start()
          self.processs.append(p)
          local_write_pipe.close()
          process_read_pipe.close()
     
      def stop(self):
        for writer_pipe in self.writers_pipes:
          writer_pipe.send('stop')
     
      def get_their_work(self, things_to_do):
        chunked_things = chunk(things_to_do, self.nb_process)
        if not self.processs:
          self._start(chunked_things)
        else:
          for i in range(self.nb_process):
            #print('send things')
            self.writers_pipes[i].send(chunked_things[i])
        things_done_collection = []
        reader_useds = []
        while self.readers_pipes:
          for r in wait(self.readers_pipes):
            try:
              things_dones = r.recv()
            except EOFError:
              reader_useds.append(r)
              self.readers_pipes.remove(r)
            else:
              reader_useds.append(r)
              self.readers_pipes.remove(r)
              things_done_collection.append(things_dones)
        self.readers_pipes = reader_useds
        return things_done_collection
     
    def run_keeped_process(target, main_write_pipe, process_read_pipe, things):
      things_dones = target(things)
      main_write_pipe.send(things_dones)
     
      new_things = None
      readers = [process_read_pipe]
      readers_used = []
      while readers:
        for r in wait(readers):
          try:
            new_things = r.recv()
            #print('p: things received')
          except EOFError:
            pass
          finally:
            readers.remove(r)
      #print('p: continue')
      if new_things != 'stop':
        run_keeped_process(target, main_write_pipe, process_read_pipe, new_things)
     
    def run_process(target, main_write_pipe, things):
      things_dones = target(things)
      main_write_pipe.send(things_dones)
    ProcessManager pour une gestion de processus mourant à chaque cycle.
    KeepedAliveProcessManager pour une gestion de processus maintenu en vie autant que souhaité.

    Et utilisé de la sorte ces "Gestionnaires de processus":

    main.py
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    import sys
     
    from multiprocessing import Process, Pipe
    from multiprocessing.connection import wait
    from processmanager import KeepedAliveProcessManager, ProcessManager
    from random import choice
    import timeit
     
    class Bug(object):
      def __init__(self, id):
        self.id = id
        self.willmove = None
     
    class SimpleWorker(object):
      def __init__(self, bugs):
        self.bugs = bugs
      def action_them(self):
        for bug in self.bugs:
          self.action_him(bug)
      def action_him(self, bug):
        bug.willmove = choice(range(9))
     
    def run_worker(bugs):
      worker = SimpleWorker(bugs)
      worker.action_them()
      return worker.bugs
     
    X=1000
    max_process = '4'
    max_cycles = '1000'
     
    bugs = []
    for i in range(X):   
      bugs.append(Bug(i))
     
    sys.setrecursionlimit(2000)
     
    def compute_keep_alive(bugs, nb_process=1, repeat=1):
      process_manager = KeepedAliveProcessManager(nb_process, run_worker)
      for i in range(repeat):
        process_manager.get_their_work(bugs)
      process_manager.stop()
     
    def compute_recreate(bugs, nb_process=1, repeat=1):
      process_manager = ProcessManager(nb_process, run_worker)
      for i in range(repeat):
        process_manager.get_their_work(bugs)
     
    print('keep: 1 processs, 001 cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process=1, repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
    print('recr: 1 processs, 001 cycles: ', timeit.timeit('compute_recreate(bugs, nb_process=1, repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
     
    print('keep: 1 processs, '+max_cycles+' cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process=1, repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
    print('recr: 1 processs, '+max_cycles+' cycles: ', timeit.timeit('compute_recreate(bugs, nb_process=1, repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
     
    print('keep: '+max_process+' processs, 001 cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process='+max_process+', repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
    print('recr: '+max_process+' processs, 001 cycles: ', timeit.timeit('compute_recreate(bugs, nb_process='+max_process+', repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
     
    print('keep: '+max_process+' processs, '+max_cycles+' cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process='+max_process+', repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
    print('recr: '+max_process+' processs, '+max_cycles+' cycles: ', timeit.timeit('compute_recreate(bugs, nb_process='+max_process+', repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
    Résultat:
    keep: 1 processs, 001 cycles: 0.00594463299785275
    recr: 1 processs, 001 cycles: 0.005860060999111738
    keep: 1 processs, 999 cycles: 6.441569473001437
    recr: 1 processs, 999 cycles: 6.598000981997757
    keep: 4 processs, 001 cycles: 0.00594096399800037
    recr: 4 processs, 001 cycles: 0.00755401499918662

    keep: 4 processs, 999 cycles: 2.705127483000979
    recr: 4 processs, 999 cycles: 6.9383211890017265
    On peux voir que la stratégie B est en effet plus rapide (près de 2x plus rapide). Cependant, si j'augmente le travail de chaque processus et réduit le nombre de cycles, soit:
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    X=20000
    max_process = '4'
    max_cycles = '100'
    keep: 1 processs, 001 cycles: 0.07270766800138517
    recr: 1 processs, 001 cycles: 0.07591420199969434
    keep: 1 processs, 100 cycles: 11.868655222999223
    recr: 1 processs, 100 cycles: 7.394821689998935
    keep: 4 processs, 001 cycles: 0.04141850199812325
    recr: 4 processs, 001 cycles: 0.03829822099942248

    keep: 4 processs, 100 cycles: 5.028753112001141
    recr: 4 processs, 100 cycles: 3.963576910999109
    Qu'estce qui fait que dans le deuxième cas de figure la stratégie A est plus rapide que la B ? De manière théorique, c'est à dire sans se fier a mon implémentation, quelle stratégie devrait être la plus optimale dans mon contexte de "calcul en continue" (sachant que X augmente avec le temps) ?

  2. #2
    Expert éminent sénior
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 287
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 287
    Points : 36 776
    Points
    36 776
    Par défaut
    Citation Envoyé par buxbux Voir le message
    Qu'estce qui fait que dans le deuxième cas de figure la stratégie A est plus rapide que la B ? De manière théorique, c'est à dire sans se fier a mon implémentation, quelle stratégie devrait être la plus optimale dans mon contexte de "calcul en continue" (sachant que X augmente avec le temps) ?
    Sur le papier B consomme moins de ressources que A doit toujours être plus rapide.
    Si ce n'est pas le cas, c'est que les processus de B "vieillissent mal".
    Par exemple, on peut imaginer que les objets traités ne sont pas recyclés. La mémoire consommé augmente. Le système pagine et çà rame.
    Dans ce cas, A "rend" la mémoire assez tôt et les coûts de création des processus sont compensés par celui des accès disques.
    Puisque les objets mémoire peuvent être détruits A n'est meilleurs que parce que c'est une façon efficace de faire le ménage.

    Je ne dis pas que c'est votre cas (je n'ai pas le temps de regarder votre code pour l'instant). Je me contente de vous donner des pistes.

    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

  3. #3
    Membre averti

    Homme Profil pro
    Développeur informatique
    Inscrit en
    Mars 2007
    Messages
    186
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 36
    Localisation : France, Alpes de Haute Provence (Provence Alpes Côte d'Azur)

    Informations professionnelles :
    Activité : Développeur informatique

    Informations forums :
    Inscription : Mars 2007
    Messages : 186
    Points : 399
    Points
    399
    Par défaut
    Piste intéressante ! Je vais regarder du coté du code exécuté par les processus et voir si un peu de ménage manuel fait la différence.

Discussions similaires

  1. performance entre 3DS, ase, asc ...
    Par amaury pouly dans le forum OpenGL
    Réponses: 3
    Dernier message: 24/03/2003, 11h41
  2. [Stratégie] Utiliser un systeme de bufferisation
    Par arnolanf dans le forum VB 6 et antérieur
    Réponses: 4
    Dernier message: 31/10/2002, 16h49
  3. [] [Stratégie] Comment créer un fichier log
    Par Skeezo dans le forum Installation, Déploiement et Sécurité
    Réponses: 4
    Dernier message: 16/09/2002, 18h30

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo