Bonjour,

J'ai un module que j'aimerais paralléliser. Il me semblait que joblib serait le package qui me permettrait de faire ça le plus facile. Mais je rencontre des difficultés.

Mon script
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
import os
from joblib import Parallel, delayed
import process_ro2_data as prd
 
listStations = ["Berge","Reservoir","Foret_ouest","Foret_est","Foret_sol"]
rawFileDir      =os.path.join("C:\\","Users","anthi182","Desktop","Data_for_automatization","Raw_data") 
asciiOutDir     =os.path.join("C:\\","Users","anthi182","Desktop","Data_for_automatization","Ascii_data") 
Parallel(n_jobs=2)(delayed(prd.convert_CSbinary_to_csv)(iStation,rawFileDir,asciiOutDir) for iStation in listStations)
qui appelle le module suivant:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import re
import pandas as pd
import subprocess
import shutil
import fileinput
from datetime import datetime as dt #TODO , timedelta as td
 
def convert_CSbinary_to_csv(stationName,rawFileDir,asciiOutDir):
 
    #Find folders that match the pattern Ro2_YYYYMMDD
    listFieldCampains = [f for f in os.listdir(rawFileDir) if re.match(r'^Ro2_[0-9]{8}$', f)]
 
    for iFieldCampain in listFieldCampains:
 
        #Find folders that match the pattern Station_YYYYMMDD
        sationNameRegex=r'^' + stationName + r'_[0-9]{8}$'
        listDataCollection  = [f for f in os.listdir(os.path.join(rawFileDir,iFieldCampain)) if re.match(sationNameRegex, f)]
 
        for iDataCollection in listDataCollection:
            print(iDataCollection)
            for rawFile in os.listdir(os.path.join(rawFileDir,iFieldCampain,iDataCollection)):              
                print('\t'+rawFile)  
 
                inFile=os.path.join(rawFileDir,iFieldCampain,iDataCollection,rawFile)
                outFile=os.path.join(asciiOutDir,stationName,rawFile)
 
                # File type name handling           
                if bool(re.search("ts_data_",rawFile)) | bool(re.search("_Time_Series_",rawFile)):
                    extension="_eddy.csv" 
                elif bool(re.search("alerte",rawFile)):
                    extension="_alert.csv"         
                elif bool(re.search("met30min",rawFile)) | bool(re.search("_Flux_CSIFormat_",rawFile)) | bool(re.search("flux",rawFile)):
                    extension="_slow.csv" 
                elif bool(re.search("radiation",rawFile)) | bool(re.search("_Flux_Notes_",rawFile)):
                    extension="_slow2.csv"             
                else:                           # .cr1 / .cr3 / sys_log files / Config_Setting_Notes / Flux_AmeriFluxFormat_12
                    shutil.copy(inFile,outFile) # TODO solve issue: file with same name will overwrite
                    continue
 
                # Conversion from the Campbell binary file to csv format
                # TODO check compatibility with unix and Wine
                process=os.path.join(".\Bin","raw2ascii","csidft_convert.exe")
                subprocess.call([process, inFile, outFile, 'ToA5'])
 
                # Rename file according to date
                fileContent=pd.read_csv(outFile, sep=',', index_col=None, skiprows=[0,2,3], nrows=1)
                try:
                    fileStartTime=dt.strptime(fileContent.TIMESTAMP[0], "%Y-%m-%d %H:%M:%S")    # TIMESTAMP format for _alert.csv, _radiation.csv, and _met30min.csv
                except:
                    fileStartTime=dt.strptime(fileContent.TIMESTAMP[0], "%Y-%m-%d %H:%M:%S.%f") # TIMESTAMP format for _eddy.csv file
                newFileName=dt.strftime(fileStartTime,'%Y%m%d_%H%M')+extension
                shutil.move(outFile,os.path.join(asciiOutDir,stationName,newFileName))
Le module convert_CSbinary_to_csv sert à appeler un executable (csidft_convert.exe) et a mettre un peu d'ordre dans mes fichiers.
Lorsque j'execute mon script, j'obtiens le message d'erreur suivant:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
Pourtant, toutes les entrées sont "pickable" selon le site référence, car ce sont des listes ou des chaines de caractères.

Qu'est ce qui empêche la parallélisation ici ?

Merci !