Bonjour,
J'explore actuellement le multiprocessing en python3.3 et effectue un petit audit de performance entre deux stratégies. Voici le contexte:
J'ai un calcul à effectuer sur X objets. L'idée est donc de découper cette liste de de X objets en différentes listes et de confier le calcul à différents processus.
Dans le contexte qui m’intéresse ce calcul (sur les X objets) doit se répéter dans le temps. C'est à dire qu'après chaque "cycle", où l'on à effectué le calcul sur tous les objets, c'est repartis pour un tour. Et ce autant de fois que nécessaire.
Jargon que je vais utiliser:
- "X": Le nombre d'objets à traiter
- "Y": Nombre de processus (égal au nombre de cœur de cpu tant qu'a faire)
- "Chunk": Groupe d'objets confié a un processus
- "Cycle": Les chunks ont été confiés aux processus, les processus on terminé leur calculs et ont tous retourné leur résultat
Partant du principe que lors de la création d'un processus ont copie l'environnement avec lui puisque indépendant dans sont processus, j'ai envisagé deux stratégies possibles:
A (recr) - A chaque cycle, récréer les Y processus avec les données a calculer (les Y processus mourant a chaque cycle après avoir fait leurs travail)
B (keep) - Créer Y processus, leur donner le travail a faire chaque cycles sans les laisser mourir hormis au dernier cycle.
Je me suis donc dit que la stratégie B serait toujours plus performante puisque les processus resteraient en vie. J'ai donc implémenter les deux méthodes de la manière suivante:
processmanager.py
ProcessManager pour une gestion de processus mourant à chaque cycle.
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 import sys if sys.version_info < (3, 3): sys.stdout.write("Python 3.3 required\n") sys.exit(1) from multiprocessing import Process, Pipe from multiprocessing.connection import wait def chunk(seq,m): i,j,x=len(seq),0,[] for k in range(m): a, j = j, j + (i+k)//m x.append(seq[a:j]) return x class ProcessManager(object): def __init__(self, nb_process, target): self.target = target self.nb_process = nb_process self.readers_pipes = [] def _start(self, chunked_things): self.readers_pipes = [] for i in range(self.nb_process): local_read_pipe, local_write_pipe = Pipe(duplex=False) self.readers_pipes.append(local_read_pipe) p = Process(target=run_process, args=(self.target, local_write_pipe, chunked_things[i])) p.start() local_write_pipe.close() def get_their_work(self, things_to_do): chunked_things = chunk(things_to_do, self.nb_process) self._start(chunked_things) things_done_collection = [] while self.readers_pipes: for r in wait(self.readers_pipes): try: things_dones = r.recv() except EOFError: self.readers_pipes.remove(r) else: self.readers_pipes.remove(r) things_done_collection.append(things_dones) return things_done_collection class KeepedAliveProcessManager(object): def __init__(self, nb_process, target): self.processs = [] self.target = target self.nb_process = nb_process self.readers_pipes = [] self.writers_pipes = [] def _start(self, chunked_things): for i in range(self.nb_process): local_read_pipe, local_write_pipe = Pipe(duplex=False) process_read_pipe, process_write_pipe = Pipe(duplex=False) self.readers_pipes.append(local_read_pipe) self.writers_pipes.append(process_write_pipe) p = Process(target=run_keeped_process, args=(self.target, local_write_pipe, process_read_pipe, chunked_things[i])) p.start() self.processs.append(p) local_write_pipe.close() process_read_pipe.close() def stop(self): for writer_pipe in self.writers_pipes: writer_pipe.send('stop') def get_their_work(self, things_to_do): chunked_things = chunk(things_to_do, self.nb_process) if not self.processs: self._start(chunked_things) else: for i in range(self.nb_process): #print('send things') self.writers_pipes[i].send(chunked_things[i]) things_done_collection = [] reader_useds = [] while self.readers_pipes: for r in wait(self.readers_pipes): try: things_dones = r.recv() except EOFError: reader_useds.append(r) self.readers_pipes.remove(r) else: reader_useds.append(r) self.readers_pipes.remove(r) things_done_collection.append(things_dones) self.readers_pipes = reader_useds return things_done_collection def run_keeped_process(target, main_write_pipe, process_read_pipe, things): things_dones = target(things) main_write_pipe.send(things_dones) new_things = None readers = [process_read_pipe] readers_used = [] while readers: for r in wait(readers): try: new_things = r.recv() #print('p: things received') except EOFError: pass finally: readers.remove(r) #print('p: continue') if new_things != 'stop': run_keeped_process(target, main_write_pipe, process_read_pipe, new_things) def run_process(target, main_write_pipe, things): things_dones = target(things) main_write_pipe.send(things_dones)
KeepedAliveProcessManager pour une gestion de processus maintenu en vie autant que souhaité.
Et utilisé de la sorte ces "Gestionnaires de processus":
main.py
Résultat:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 import sys from multiprocessing import Process, Pipe from multiprocessing.connection import wait from processmanager import KeepedAliveProcessManager, ProcessManager from random import choice import timeit class Bug(object): def __init__(self, id): self.id = id self.willmove = None class SimpleWorker(object): def __init__(self, bugs): self.bugs = bugs def action_them(self): for bug in self.bugs: self.action_him(bug) def action_him(self, bug): bug.willmove = choice(range(9)) def run_worker(bugs): worker = SimpleWorker(bugs) worker.action_them() return worker.bugs X=1000 max_process = '4' max_cycles = '1000' bugs = [] for i in range(X): bugs.append(Bug(i)) sys.setrecursionlimit(2000) def compute_keep_alive(bugs, nb_process=1, repeat=1): process_manager = KeepedAliveProcessManager(nb_process, run_worker) for i in range(repeat): process_manager.get_their_work(bugs) process_manager.stop() def compute_recreate(bugs, nb_process=1, repeat=1): process_manager = ProcessManager(nb_process, run_worker) for i in range(repeat): process_manager.get_their_work(bugs) print('keep: 1 processs, 001 cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process=1, repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive')) print('recr: 1 processs, 001 cycles: ', timeit.timeit('compute_recreate(bugs, nb_process=1, repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate')) print('keep: 1 processs, '+max_cycles+' cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process=1, repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive')) print('recr: 1 processs, '+max_cycles+' cycles: ', timeit.timeit('compute_recreate(bugs, nb_process=1, repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate')) print('keep: '+max_process+' processs, 001 cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process='+max_process+', repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive')) print('recr: '+max_process+' processs, 001 cycles: ', timeit.timeit('compute_recreate(bugs, nb_process='+max_process+', repeat=1)',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate')) print('keep: '+max_process+' processs, '+max_cycles+' cycles: ', timeit.timeit('compute_keep_alive(bugs, nb_process='+max_process+', repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_keep_alive')) print('recr: '+max_process+' processs, '+max_cycles+' cycles: ', timeit.timeit('compute_recreate(bugs, nb_process='+max_process+', repeat='+max_cycles+')',number=1,setup='from __main__ import bugs,Bug,SimpleWorker,compute_recreate'))
On peux voir que la stratégie B est en effet plus rapide (près de 2x plus rapide). Cependant, si j'augmente le travail de chaque processus et réduit le nombre de cycles, soit:keep: 1 processs, 001 cycles: 0.00594463299785275
recr: 1 processs, 001 cycles: 0.005860060999111738
keep: 1 processs, 999 cycles: 6.441569473001437
recr: 1 processs, 999 cycles: 6.598000981997757
keep: 4 processs, 001 cycles: 0.00594096399800037
recr: 4 processs, 001 cycles: 0.00755401499918662
keep: 4 processs, 999 cycles: 2.705127483000979
recr: 4 processs, 999 cycles: 6.9383211890017265
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3 X=20000 max_process = '4' max_cycles = '100'Qu'estce qui fait que dans le deuxième cas de figure la stratégie A est plus rapide que la B ? De manière théorique, c'est à dire sans se fier a mon implémentation, quelle stratégie devrait être la plus optimale dans mon contexte de "calcul en continue" (sachant que X augmente avec le temps) ?keep: 1 processs, 001 cycles: 0.07270766800138517
recr: 1 processs, 001 cycles: 0.07591420199969434
keep: 1 processs, 100 cycles: 11.868655222999223
recr: 1 processs, 100 cycles: 7.394821689998935
keep: 4 processs, 001 cycles: 0.04141850199812325
recr: 4 processs, 001 cycles: 0.03829822099942248
keep: 4 processs, 100 cycles: 5.028753112001141
recr: 4 processs, 100 cycles: 3.963576910999109
Partager