1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| # -*- coding: utf_8 -*-
import multiprocessing as mp
import sys
__all__ = ["MultiProcess", "CoreProcess"]
if ( sys.version[0]=="3" ): #: En fait, j'utilise partout (enfin, seulement deux fois) des multiprocessing.Queue()
from queue import Empty #: mais l'exception qui dit que la Queue est vide est celle de la Queue standard,
else: #: laquelle se trouve dans le module 'Queue', ou 'queue' dans les versions 3.x de Python
from Queue import Empty
class CConsole (object):
# Vérrouille les écritures dans la console
def __init__(self):
self._lock = mp.Lock()
def __call__( self, txt ):
with self._lock:
print( txt )
class MultiProcess (object):
# Lance un nombre de Process identique au nombre de CPU sur la machine pour effectuer tous les traitements dans la liste
# ( Le plus important à comprendre, c'est les deux mp.Queue : ToDoQueue et ResultQueue
def __init__( self, CProcess, ToDoList, **DArgs ):
self.CProcess = CProcess
self.ToDoQueue = mp.Queue() #: Et non plus = CToDoQueue( ToDoListe )
for item in ToDoList: #: Ces deux lignes remplacent la class CToDoQueue présentes dans
self.ToDoQueue.put(item) #: les versions précédentes du module
self.Console = CConsole()
self.NbrProcess = min( mp.cpu_count(), len(ToDoList) ) #: Car on ne va quand-même pas lancer plus de Process qu'il y a d'ItemToDo.
self.Process = []
self.ResultQueue = mp.Queue()
self.DArgs = DArgs
def __call__( self ):
self.start()
self.join()
#: Ici, tous les travaux sont fini, on s'occupe de ranger les résultats
ResultList = []
QIsNotEmpty = True
while QIsNotEmpty:
try:
Result = self.ResultQueue.get_nowait()
ResultList += [Result]
except Empty: #: Quand la liste est vide, ça retourne une erreur
QIsNotEmpty = False
except Exception: #: Là, c'est pas normal!
raise
return ResultList # Dans n'importe quel ordre
def start( self ):
for i in range( self.NbrProcess ):
self.Process.append ( self.CProcess( self.ResultQueue , self.Console, self.ToDoQueue, **self.DArgs ) )
self.Process[i].start()
def join( self ):
for i in range( self.NbrProcess ):
self.Process[i].join()
class CoreProcess ( mp.Process ):
# Processus qui fait sa part de boulot tant qu'il en reste dans la liste
# ( Dans mon projet réel, l'initialisation du Process peut être lourde, c'est pourquoi
# ( il n'est pas initialisé autant de fois qu'il y a d'éléments à traiter mais seulement
# ( autant de fois qu'il y a de CPU
# Interface :
# run() : Doit être surchargé
# getItemToDo() : Obtenir une tâche à exécuter
# addItemResult() : Ajouter un résultat
# Empty : Exception quand la liste des tâches est vide
# Console( txt ) : Pour faire un print verrouillé
# Et aussi tout argument nommé passé à MultiProcess
def __init__( self, ResultQueue, Console, ToDoQueue, **DArgs ):
# Si surchargé il faut faire : super (MyCoreProcess,self)__init__(self, *TArgs, **DArgs)
super( CoreProcess, self ).__init__()
self.ResultQueue = ResultQueue
self.Console = Console
self.ToDoQueue = ToDoQueue
self.Empty = Empty #: Comme ça, l'utilisateur qui hérite de CoreProcess n'est pas emmerdé avec
for attr, value in DArgs.items(): #: la déclaration Queue.Empty ou queue.Empty
dict.__setattr__( self, attr, value ) #: Permet à tous les arguments nommés de DArgs, par exemple toto=33
#: d'être accessibles dans run() par self.toto
def run( self ):
# Devra être surchargé pour faire vraiment ce que l'on veut
import time, os
QIsNotEmpty = True
while QIsNotEmpty:
try:
# ////////////////////////////////////////////////
ItemToDo = self.getItemToDo()
time.sleep(0.2)
self.Console( "%s %s" % (ItemToDo, self.toto) )
self.addItemResult( ItemToDo*ItemToDo )
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
except self.Empty:
QIsNotEmpty = False
except Exception:
raise
def getItemToDo( self ):
return self.ToDoQueue.get_nowait()
def addItemResult( self, ItemResult ):
self.ResultQueue.put( ItemResult )
if __name__ == '__main__':
# Initialisations
ToDoList = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
MyMultiProcess = MultiProcess( CoreProcess, ToDoList, toto=33 )
# Ici je fais le job, dans des Process, avec les start() et les joint()
ResultList = MyMultiProcess()
print('')
# Ici je fais ce que je veux avec mes résultats
for Result in ResultList:
print( Result )
print('') |