Bonjour à tous,

Je doit traiter de grande capacité de data. J'utilise Python et je viens de découvrir l'outils multiprocessing.
Avant tout, je travaille dans un serveur avec Slurm, donc je fais un script en bash pour appeler l’exécution de mon script en python.
J'ai donc réorienté mes scripts pour qu'ils puissent s'effectuer en parallèle. Sauf que je ne suis pas sur de ce que j'ai fait !
Dans l'idée, ça tourne et ça me donne des résultats corrects, mais j'aimerai savoir si j'ai bien fait ce que je pensais faire !

Ma fonction est assez grosse:
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def Whitening(a1,a2,a3,b1,b2,b3,st1,st2,Compo):
     for kk in range(1,2):
         name1=str(station[st1])+'.'+'0.0.'+str(jour[kk])+'.mseed'
         name2=str(station[st2])+'.'+'0.0.'+str(jour[kk])+'.mseed'
         path1=r"/scratch/tgaubert001/data/Champ_captant/1/Mseed/"+'/'+name1
         path2=r"/scratch/tgaubert001/data/Champ_captant/1/Mseed/"+'/'+name2
         D1=read(path1)      
         D2=read(path2)
         Xcross=[]
         d1=D1[Compo].data
         d2=D2[Compo].data
         print(d1.size,d2.size)
         tic1=D1[Compo].times()#depart 1 
         tic2=D2[Compo].times()#depart 2
         if tic2.size-tic1.size<0:
             print('la Station ',str(D1[Compo].stats.station),' est la premiere a etre mise en route')
             d1f=d1[int((tic1.size-tic2.size))::]
             d2f=d2[0:d2.size]
         elif tic2.size-tic1.size>0:
             print('la Station ',str(D2[Compo].stats.station),' est la premiere a etre mise en route')
             d2f=d2[int((tic2.size-tic1.size))::]
             d1f=d1[0:d1.size]
         else:
             print('la Station ',str(D1[Compo].stats.station),'et la station',str(D2[N].stats.station),'ont le meme temps de depart')
             d1f=d1[0:d1.size]
             d2f=d2[0:d2.size]  
         d1_filt=lfilter(b1, a1, d1f[0:d1f.size])     
         d2_filt=lfilter(b1, a1, d2f[0:d2f.size])      
         d1_filt=lfilter(b2, a2, d1_filt[0:d1_filt.size])     
         d2_filt=lfilter(b2, a2, d2_filt[0:d2_filt.size])
         d1_filt[0:d1_filt.size]=signal.filtfilt(b3,a3,d1_filt[0:d1_filt.size],axis=0,padtype = 'odd', padlen=3*(max(len(b3),len(a3))-1))
         d2_filt[0:d2_filt.size]=signal.filtfilt(b3,a3,d2_filt[0:d2_filt.size],axis=0,padtype = 'odd', padlen=3*(max(len(b3),len(a3))-1))
         print(d1_filt.size,d2_filt.size)
         if option_white==1:
            d1_fft=rfft(d1_filt[0:d1_filt.size])
            d2_fft=rfft(d2_filt[0:d2_filt.size])
            freq=np.linspace(0,fs,d1_fft.size)
            idx = (freq>=DFc2[0])*(freq<=DFc2[1])
            idxdebut= (freq>DFc2[0])*(freq<DFc2[0]+deltaf)
            idxfin= (freq>DFc2[1]-deltaf)*(freq<DFc2[1])
            C1=np.zeros((d1_fft.size),dtype=complex)
            C1[idx]=d1_fft[np.where(idx)]/abs(d1_fft[np.where(idx)])
            if C1[idxdebut].size>1:
                 C1[idxdebut]=C1[idxdebut]*(np.sin(np.pi/2*(np.arange(0,C1[idxdebut].size)/C1[idxdebut].size))**2)                  
            if C1[idxfin].size>1:
                 C1[idxfin]=C1[idxfin]*(np.cos(np.pi/2*(np.arange(0,C1[idxfin].size)/C1[idxfin].size))**2) 
            C1[0:np.where(idxdebut)[0][0]-1]=0
            C1[np.where(idxfin)[0][-1]:]=0
            c1_w=np.real(irfft(C1))
            C2=np.zeros((d2_fft.size),dtype=complex)#Initiation trace blanchie 2
            C2[idx]=d2_fft[np.where(idx)]/abs(d2_fft[np.where(idx)])
            if C2[idxdebut].size>1:
                 C2[idxdebut]=C2[idxdebut]*(np.sin(np.pi/2*(np.arange(0,C2[idxdebut].size)/C2[idxdebut].size))**2)              
            if C2[idxfin].size>1:
                 C2[idxfin]=C2[idxfin]*(np.cos(np.pi/2*(np.arange(0,C2[idxfin].size)/C2[idxfin].size))**2)            
            C2[0:np.where(idxdebut)[0][0]-1]=0
            C2[np.where(idxfin)[0][-1]:]=0
            c2_w=np.real(irfft(C2))
            c1_w=c1_w-c1_w.mean()
            c2_w=c2_w-c2_w.mean()
            c1_w=signal.detrend(c1_w)
            c2_w=signal.detrend(c2_w)  
            print(c1_w.size,c2_w.size,Compo)
           for i in range(nb_heure_jour-1):
            #    i=i+1
                Inn=((round(c1_w.size/nb_heure_jour))*(i-1))+np.arange(1,1+round(c1_w.size/nb_heure_jour))#Besoin de continuite pour le calcul ou non?
                #energie1.append(np.sum(c1_w[Inn]**2))# On valide. 
                #energie2.append(np.sum(c2_w[Inn]**2))# On valide
                c=scipy.signal.correlate(c1_w[Inn],c2_w[Inn],mode='same',method='fft')/(np.sum(c1_w[Inn]**2)*np.sum(c2_w[Inn]**2))
                C=c[c.size//2-700:c.size//2+700]
                Xcross.append(C)
                #print(C.size,i,len(Xcross))
                namefile='Composante'+str(Compo)+'Cross_corr_White_'+str(option_white)+'_Station_'+str(station[st1])+'_Station_'+str(station[st2])+'_filtre_'       
                namefile=namefile+str(DFc1[0])+'_'+str(DFc2[1])+'_Hz'+str(maxlag)+'jours_numero'+str(kk)
                repertoire= r'/scratch/tgaubert001/data/Champ_captant/1/Xcorr/'+str(station[st1])+'.0.0/'+'Station_'+str(station[st1])+'_Station_'+str(station[st2])
                os.makedirs(repertoire, exist_ok=True)
                os.chdir(repertoire)
               #print(Compo,i,len(Xcross))
                np.save(namefile,Xcross)
            #    return(nb_heure_jour)


Et je l'appel dans mon script comme suit :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
 
from multiprocessing.dummy import Pool as ThreadPool
def multi_run_wrapper(args):
	return Whitening(*args)
 
pool= ThreadPool(5)
N=0
E=1
V=2
for st1 in range(60,61):
    for st2 in range(1,2):
         aux_val=[(N),(E),(V)]
         results=pool.map(multi_run_wrapper,[(a1,a2,a3,b1,b2,b3,st1,st2,N),(a1,a2,a3,b1,b2,b3,st1,st2,E),(a1,a2,a3,b1,b2,b3,st1,st2,V)])

J'aimerai savoir si je fais bien tourner en simultané sur les paramètre N,E et V ma fonction Whitening


Mon script en bash :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 
#!/bin/bash
#SBATCH --partition=standard
#SBATCH --account=uppa
#SBATCH --workdir=.
#SBATCH --time=5-00:00:00
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=16
#SBATCH --mem-per-cpu=2000
echo "------------------------------------------------------------"
type anaconda
echo "SCRATCHDIR           =  $SCRATCHDIR"
echo "SLURM_JOB_NAME       =  $SLURM_JOB_NAME"
echo "SLURM_JOB_ID         =  $SLURM_JOB_ID"
echo "SLURM_JOB_NODELIST   =  $SLURM_JOB_NODELIST"
echo ""
echo "SLURM_JOB_NUM_NODES  =  $SLURM_JOB_NUM_NODES"
echo "SLURM_NTASKS         =  $SLURM_NTASKS"
echo "SLURM_TASKS_PER_NODE =  $SLURM_TASKS_PER_NODE"
echo "SLURM_CPUS_PER_TASK  =  $SLURM_CPUS_PER_TASK"
echo "------------------------------------------------------------"
module purge
module load anaconda/3
source activate Obspy2
module list
eval conda info
echo "------------------------------------------------------------"
python Xcross_parallelize_test.py input_file.txt

Je vous remercie de m'avoir lu,


Thomasgeo