Bonjour,
J'ai un code utilisant xarray pour extraire des données depuis une source netCDF (ECMWF ERA5).
De manière simplifiée, il y a quatre dimensions (x, y, z et t) et trois variables (r, h et g).
Toujours en simplifiant un peu, je dois extraire toutes les valeurs de r, h et g pour toutes les dimensions z et t pour chaque couple x/y.
J'ai un code série qui marche, dont une représentation simplifiée est :
Je l'ai écrit de cette manière (créer des listes pour chaque paramètre puis les passer à la fonction via map) dans l'objectif de paralléliser l'exécution en utilisant concurrent.futures.ThreadPoolExecutor ou concurrent.futures.ProcessPoolExecutor en remplaçant l'appel map de cette manière :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 import xarray as xr def extract_data(data,z,x,y,t): out_str = '' for t_val in t: for z_val in z: out_str += data.sel(x,y,t_val,z_val).data return(out_str) def data_get_write(dir,r,h,g,x,y,z,t): r_str = extract_data(r,x,y,z,t) with open(dir+'r_file', 'w') as f_r: f_r.write(r_str) h_str = extract_data(h,x,y,z,t) with open(dir+'h_file', 'w') as f_h: f_h.write(h_str) g_str = extract_data(g,x,y,z,t) with open(dir+'g_file', 'w') as f_g: f_g.write(g_str) return(<operation>) if __name__ == '__main__': ds=xr.open_dataset('data.nc') r=ds['r'] h=ds['h'] g=ds['g'] z=ds['z'] t=ds['t'] dir_list=[] x_list=[] y_list=[] for val_x in x: for val_y in y: dir=(<operation>) dir_list.append(dir) x_list.append(x) y_list.append(y) list_len=len(dir_list) r_list=[r]*list_len h_list=[h]*list_len g_list=[g]*list_len z_list=[z]*list_len t_list=[t]*list_len results = map(data_get_write, r_list, h_list, g_list, x_list, y_list, z_list, t_list)
Avec ThreadPoolExecutor le problème c'est que la version parallélisée est moins rapide que la version série...
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4 max_workers = min(32, int(arguments["-n"]) + 4) with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: results = executor.map(data_get_write, r_list, h_list, g_list, x_list, y_list, z_list, t_list)
Sur mon jeu de données de test (netcdf de 600Mo, dimensions x et y avec une vingtaine de valeurs et t à 6 valeurs) et mon serveur de test, la version série prend environ 2'40", la version parallèle 6'30". J'ai fait varier le nombre de cœurs alloués de 1 à 20, il n'y a aucun impact sur le temps d'exécution (ou du moins juste à la marge, plus ou moins quelques secondes, moins de 10%). Il n'y a pas plus de variabilité entre les runs en faisant changer le nombre de cœurs alloués qu'en relançant plusieurs fois de suite le même run.
Ça s'explique, si je ne m'abuse, par le verrou global qui empêche les threads concurrents d'accéder aux données en lecture de manière réellement concurrente puisqu'elles sont passées par référence et pas par valeur. Et ça semble être confirmé en basculant sur ProcessPoolExecutor même si la baisse de vitesse d'exécution est surprenante.
Avec ProcessPoolExecutor on constate un gain, ça passe à 1'25" mais à nouveau, quel que soit le nombre de cœurs, il n'y pas de modification significative du temps d'exécution. On pourrait penser qu'il s'agit d'un bottleneck sur le filesystem mais il n'y a pas non plus de modification en fonction du système de stockage utilisé, que ce soit le disque local au serveur ou un montage NFS.
Il doit donc y avoir quelque chose de fondamental, soit dans mon choix de bibliothèque de parallélisation ou dans mon code même, qui l'empêche de scaler avec la surface d'exécution. C'est peut-être très simple mais je suis complètement novice, autant en lecture / extraction de données depuis du netCDF qu'en parallélisation de code, donc il ne serait pas surprenant que ce soit un cas de pebkac. Toute aide / suggestion de piste pour améliorer mon code est la bienvenue, merci !
Partager