Bonjour,

J'explore actuellement le multiprocessing en python3.3 et effectue un petit audit de performance entre deux stratégies. Voici le contexte:

J'ai un calcul à effectuer sur X objets. L'idée est donc de découper cette liste de de X objets en différentes listes et de confier le calcul à différents processus.
Dans le contexte qui m’intéresse ce calcul (sur les X objets) doit se répéter dans le temps. C'est à dire qu'après chaque "cycle", où l'on à effectué le calcul sur tous les objets, c'est repartis pour un tour. Et ce autant de fois que nécessaire.
Jargon que je vais utiliser:
- "X": Le nombre d'objets à traiter
- "Y": Nombre de processus (égal au nombre de cœur de cpu tant qu'a faire)
- "Chunk": Groupe d'objets confié a un processus
- "Cycle": Les chunks ont été confiés aux processus, les processus on terminé leur calculs et ont tous retourné leur résultat

Partant du principe que lors de la création d'un processus ont copie l'environnement avec lui puisque indépendant dans sont processus, j'ai envisagé deux stratégies possibles:
A (recr) - A chaque cycle, récréer les Y processus avec les données a calculer (les Y processus mourant a chaque cycle après avoir fait leurs travail)
B (keep) - Créer Y processus, leur donner le travail a faire chaque cycles sans les laisser mourir hormis au dernier cycle.

Je me suis donc dit que la stratégie B serait toujours plus performante puisque les processus resteraient en vie. J'ai donc implémenter les deux méthodes de la manière suivante:

processmanager.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import sys
 
if sys.version_info < (3, 3):
  sys.stdout.write("Python 3.3 required\n")
  sys.exit(1)
 
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
 
def chunk(seq,m):
   i,j,x=len(seq),0,[]
   for k in range(m):
     a, j = j, j + (i+k)//m
     x.append(seq[a:j])
   return x
 
class ProcessManager(object):
 
  def __init__(self, nb_process, target):
    self.target = target
    self.nb_process = nb_process
    self.readers_pipes = []
 
  def _start(self, chunked_things):
    self.readers_pipes = []
    for i in range(self.nb_process):
      local_read_pipe, local_write_pipe = Pipe(duplex=False)
      self.readers_pipes.append(local_read_pipe)
      p = Process(target=run_process, args=(self.target, local_write_pipe, chunked_things[i]))
      p.start()
      local_write_pipe.close()
 
  def get_their_work(self, things_to_do):
    chunked_things = chunk(things_to_do, self.nb_process)
    self._start(chunked_things)
    things_done_collection = []
    while self.readers_pipes:
      for r in wait(self.readers_pipes):
        try:
          things_dones = r.recv()
        except EOFError:
          self.readers_pipes.remove(r)
        else:
          self.readers_pipes.remove(r)
          things_done_collection.append(things_dones)
    return things_done_collection
 
 
class KeepedAliveProcessManager(object):
 
  def __init__(self, nb_process, target):
    self.processs = []
    self.target = target
    self.nb_process = nb_process
    self.readers_pipes = []
    self.writers_pipes = []
 
  def _start(self, chunked_things):
    for i in range(self.nb_process):
      local_read_pipe, local_write_pipe = Pipe(duplex=False)
      process_read_pipe, process_write_pipe = Pipe(duplex=False)
      self.readers_pipes.append(local_read_pipe)
      self.writers_pipes.append(process_write_pipe)
      p = Process(target=run_keeped_process, args=(self.target, local_write_pipe, process_read_pipe, chunked_things[i]))
      p.start()
      self.processs.append(p)
      local_write_pipe.close()
      process_read_pipe.close()
 
  def stop(self):
    for writer_pipe in self.writers_pipes:
      writer_pipe.send('stop')
 
  def get_their_work(self, things_to_do):
    chunked_things = chunk(things_to_do, self.nb_process)
    if not self.processs:
      self._start(chunked_things)
    else:
      for i in range(self.nb_process):
        #print('send things')
        self.writers_pipes[i].send(chunked_things[i])
    things_done_collection = []
    reader_useds = []
    while self.readers_pipes:
      for r in wait(self.readers_pipes):
        try:
          things_dones = r.recv()
        except EOFError:
          reader_useds.append(r)
          self.readers_pipes.remove(r)
        else:
          reader_useds.append(r)
          self.readers_pipes.remove(r)
          things_done_collection.append(things_dones)
    self.readers_pipes = reader_useds
    return things_done_collection
 
def run_keeped_process(target, main_write_pipe, process_read_pipe, things):
  things_dones = target(things)
  main_write_pipe.send(things_dones)
 
  new_things = None
  readers = [process_read_pipe]
  readers_used = []
  while readers:
    for r in wait(readers):
      try:
        new_things = r.recv()
        #print('p: things received')
      except EOFError:
        pass
      finally:
        readers.remove(r)
  #print('p: continue')
  if new_things != 'stop':
    run_keeped_process(target, main_write_pipe, process_read_pipe, new_things)
 
def run_process(target, main_write_pipe, things):
  things_dones = target(things)
  main_write_pipe.send(things_dones)
ProcessManager pour une gestion de processus mourant à chaque cycle.
KeepedAliveProcessManager pour une gestion de processus maintenu en vie autant que souhaité.

Et utilisé de la sorte ces "Gestionnaires de processus":

main.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import sys
 
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
from processmanager import KeepedAliveProcessManager, ProcessManager
from random import choice
import timeit
 
class Bug(object):
  def __init__(self, id):
    self.id = id
    self.willmove = None
 
class SimpleWorker(object):
  def __init__(self, bugs):
    self.bugs = bugs
  def action_them(self):
    for bug in self.bugs:
      self.action_him(bug)
  def action_him(self, bug):
    bug.willmove = choice(range(9))
 
def run_worker(bugs):
  worker = SimpleWorker(bugs)
  worker.action_them()
  return worker.bugs
 
X=1000
max_process = '4'
max_cycles = '1000'
 
bugs = []
for i in range(X):   
  bugs.append(Bug(i))
 
sys.setrecursionlimit(2000)
 
def compute_keep_alive(bugs, nb_process=1, repeat=1):
  process_manager = KeepedAliveProcessManager(nb_process, run_worker)
  for i in range(repeat):
    process_manager.get_their_work(bugs)
  process_manager.stop()
 
def compute_recreate(bugs, nb_process=1, repeat=1):
  process_manager = ProcessManager(nb_process, run_worker)
  for i in range(repeat):
    process_manager.get_their_work(bugs)
 
print('keep: 1 processs, 001 cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process=1, repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
print('recr: 1 processs, 001 cycles: ', timeit.timeit('compute_recreate(bugs, nb_process=1, repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
 
print('keep: 1 processs, '+max_cycles+' cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process=1, repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
print('recr: 1 processs, '+max_cycles+' cycles: ', timeit.timeit('compute_recreate(bugs, nb_process=1, repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
 
print('keep: '+max_process+' processs, 001 cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process='+max_process+', repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
print('recr: '+max_process+' processs, 001 cycles: ', timeit.timeit('compute_recreate(bugs, nb_process='+max_process+', repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
 
print('keep: '+max_process+' processs, '+max_cycles+' cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process='+max_process+', repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive'))
print('recr: '+max_process+' processs, '+max_cycles+' cycles: ', timeit.timeit('compute_recreate(bugs, nb_process='+max_process+', repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
Résultat:
keep: 1 processs, 001 cycles: 0.00594463299785275
recr: 1 processs, 001 cycles: 0.005860060999111738
keep: 1 processs, 999 cycles: 6.441569473001437
recr: 1 processs, 999 cycles: 6.598000981997757
keep: 4 processs, 001 cycles: 0.00594096399800037
recr: 4 processs, 001 cycles: 0.00755401499918662

keep: 4 processs, 999 cycles: 2.705127483000979
recr: 4 processs, 999 cycles: 6.9383211890017265
On peux voir que la stratégie B est en effet plus rapide (près de 2x plus rapide). Cependant, si j'augmente le travail de chaque processus et réduit le nombre de cycles, soit:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
X=20000
max_process = '4'
max_cycles = '100'
keep: 1 processs, 001 cycles: 0.07270766800138517
recr: 1 processs, 001 cycles: 0.07591420199969434
keep: 1 processs, 100 cycles: 11.868655222999223
recr: 1 processs, 100 cycles: 7.394821689998935
keep: 4 processs, 001 cycles: 0.04141850199812325
recr: 4 processs, 001 cycles: 0.03829822099942248

keep: 4 processs, 100 cycles: 5.028753112001141
recr: 4 processs, 100 cycles: 3.963576910999109
Qu'estce qui fait que dans le deuxième cas de figure la stratégie A est plus rapide que la B ? De manière théorique, c'est à dire sans se fier a mon implémentation, quelle stratégie devrait être la plus optimale dans mon contexte de "calcul en continue" (sachant que X augmente avec le temps) ?