Salut.
Actuellement mon architecture est la suivante:
- Une interface (point d'entrée du programme)
- Un thread "principal", crée depuis l'interface, lance une simulation. Cette simulation correspond à l'exécution d'un certain nombre de tâches. Ces tâches peuvent être soit nombreuses et rapides, soit peu nombreuses mais très longues. Afin d'implémenter un mécanisme de pause/resume/stop, ces tâches sont à leur tour exécutées dans des threads (en réalité un seul traitant chaque tâche au fur et à mesure).
Tout cela marche assez bien. Mais comme indiqué ci-dessus, certaines simulations font appels à des tâches longues (je parle ici d'heures). Je souhaite désormais en exécuter plusieurs en parallèle. Si j'augmente le nombre de threads en charge de traiter les tâches, je ne gagne rien en terme de temps calcul, j'en perds même car les calculs en question sont gournands. Je pense donc me tourner vers du multiprocessing.
Le problème c'est que je sèche complètement quand il s'agit de coupler le multithreading et le multiprocessing.
J'ai écrit un démonstrateur (ci-dessous) où j'ai enlevé le surplus (dont les méthodes permettant de mettre en pause les calculs):
Le code ci-dessus est la version sans multi-processing fonctionnant bien pour moi (commentaires acceptés). Naivement je pensais ajouter ceci pour lancer la fonction target dans un process:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 import threading, time from multiprocessing import Process import queue import random random.seed(1234) class Worker(threading.Thread): def __init__(self, target, tasks, results, name='', **kwgs): super(Worker, self).__init__() self.target = target self.tasks = tasks self.results = results self.name = name self._kwgs = kwgs def run(self): while not self.tasks.empty(): task = self.tasks.get() res = self.target(*task, **self._kwgs) print(' WORKER-%s : %s' % (self.name, res)) self.results.put((task, res)) class ThreadedTasks(threading.Thread): def __init__(self, target, tasks, nprocs=1, **kwgs): super(ThreadedTasks, self).__init__() self.target = target # Create a queue where tasks will be put and shared with workers self._generator_tasks = tasks self._qtasks = queue.Queue(maxsize=nprocs) # Create a queue to share resutls between threads self._qresults = queue.Queue() # Create workers and put the same number of tasks in the queue, # otherwise they will finish their "job" immediately. self._workers = [] for i in range(nprocs): task = next(self._generator_tasks) self._qtasks.put(task, block=True) # Build worker worker = Worker( target, self._qtasks, self._qresults, name=str(i), **kwgs, ) self._workers.append(worker) def run(self): # Start workers for worker in self._workers: worker.start() for task in self._generator_tasks: self._qtasks.put(task, block=True) # Wait for workers to finish for worker in self._workers: worker.join() class Solver(object): def __init__(self, name): self.name = name def __repr__(self): return self.name def __str__(self): return str(self.name) def solve(self, *args, **kwgs): timelaps = round(random.random(), 3) time.sleep(timelaps) return timelaps def func(solver, *args, **kwgs): return solver.solve(*args, **kwgs) def main(): # Create a generator of tasks. The main advantage of the generator is # its light memory footprint. Especially for large number of tasks tasks = ((Solver(i), i) for i in range(50)) # Create main thread thread = ThreadedTasks(func, tasks, nprocs=3) # Start main thread thread.start() # Wait until the thread exits thread.join() if __name__ == '__main__': main()
Ce qui mis ensemble ferait (juste la classe Worker:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6 q= queue.Queue() p = Process(target=my_function, args=(q, 1)) p.start() p.join() res = q.get()
J'ai alors un beau problème de méthode non "pickable". Je ne vois pas comment le contourner. Et je me dis que je suis en train de constuire une machine à gaz. Quel serait selon vous la meilleure approche pour réaliser ce que je souhaite?
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 class Worker(threading.Thread): def __init__(self, target, tasks, results, name='', **kwgs): super(Worker, self).__init__() self.target = target self.tasks = tasks self.results = results self.name = name self._kwgs = kwgs def run(self): def process_func_wrapper(func, q, *args, **kwargs): q.put(func(*args, **kwargs)) while not self.tasks.empty(): task = self.tasks.get() q = queue.Queue() p = Process(target=process_func_wrapper, args=(self.target, q, *task), kwargs=self._kwgs) p.start() p.join() res = q.get() print(' WORKER-%s : %s' % (self.name, res)) self.results.put((task, res)
CiaoException in thread 0:
Traceback (most recent call last):
File "C:\users-apps\Anaconda3\lib\threading.py", line 914, in _bootstrap_inner
self.run()
File "pause_thread_ter.py", line 33, in run
p.start()
File "C:\users-apps\Anaconda3\lib\multiprocessing\process.py", line 105, in st
art
self._popen = self._Popen(self)
File "C:\users-apps\Anaconda3\lib\multiprocessing\context.py", line 212, in _P
open
return _default_context.get_context().Process._Popen(process_obj)
File "C:\users-apps\Anaconda3\lib\multiprocessing\context.py", line 313, in _P
open
return Popen(process_obj)
File "C:\users-apps\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line
66, in __init__
reduction.dump(process_obj, to_child)
File "C:\users-apps\Anaconda3\lib\multiprocessing\reduction.py", line 59, in d
ump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
Julien
Partager