Bonjour,
Je veux paralléliser un code d'extraction de données depuis un dataset netCDF et j'obtiens une performance moins bonne que si je ne parallélise pas.
Ce n'est pas un problème de ProcessPoolExecutor à proprement parler car avec un cas test j'obtiens bien une accélération lorsque c'est parallélisé. Voici le code (attention, c'est basique, pas beau et tout mais c'est juste un test pour valider le principe) de mon cas test :
On a un gain pas fou mais réel quand on passe en parallélisé :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 #!/usr/bin/env python3 # coding: utf-8 ############################################################################### """\ ppp.py ---------------------- Python Parallelisation Performance Compute sum of prime numbers in a range. usage: ppp.py [-p] [-n <nb_cores>] options: -n <nb_cores> Use <nb_cores> local cores to extract data Default: 1 -p Turn on parallelism. """ import sys import argparse import xarray as xr import concurrent.futures import time def parse_args(): """ Parse arguments """ parser = argparse.ArgumentParser( description="Compute sum of prime \ numbers in a range" ) parser.add_argument( "-p", action="store_true", help="use multiprocessing", ) parser.add_argument( "-n", type=int, help="number of cores to use in parallel processing", default=1, ) args = parser.parse_args() return args def task(nb: int) -> int: out = 0 if nb < 1: print("Cannot compute primes of numbers less than 1") sys.exit(-1) for i in range(1, nb): if (nb % i) == 0: out += i return out def main(argv: sys.argv): """ Main execution function. """ # ===================== # Parse arguments # ===================== args = parse_args() # Profiling start = time.perf_counter() # --- # Values to be submitted to task vals = [10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000] # --- # Actual function # TODO: get parallelism to provide speedup over the simple linear version if args.p: # Multiprocessing call to the wrapper extraction function max_workers = min(32, int(args.n) + 4) with concurrent.futures.ProcessPoolExecutor( max_workers=max_workers ) as executor: res = executor.map( task, vals, ) for elem in res: print(elem) else: for elem in vals: res = task(elem) print(res) # --- end = time.perf_counter() print(f"Execution time: {end - start} seconds.") if __name__ == "__main__": sys.exit(main(sys.argv[1:]))
Je passe une bête liste
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7 $ ./ppp.py -n 8 ... Execution time: 16.775680779999675 seconds. $ ./ppp.py -n 8 -p ... Execution time: 4.326744636000512 seconds.avec toujours la même valeur au map, ça ça fonctionne.
Code : Sélectionner tout - Visualiser dans une fenêtre à part vals = [10000000, 10000000, ...]
Par contre, quand je reprends ce code et que je l'adapte à ce que je veux faire (lire un dataset netCDF et extraire les données contenues dedans vers des fichiers texte) le comportement est inverse avec une exécution parallèle environ 40 fois plus lente que l'exécution séquentielle !
La seule chose qui change c'est que vals est maintenant
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 #!/usr/bin/env python3 # coding: utf-8 ############################################################################### """\ ppp.py ---------------------- Python Parallelisation Performance Extract information from ECMWF ERA5 model data for use in atmospheric delay correction computation. usage: ppp.py [-p] [-n <nb_cores>] options: -n <nb_cores> Use <nb_cores> local cores to extract data Default: 1 -p Turn on parallelism. """ import sys import argparse import xarray as xr from pathlib import Path import concurrent.futures import time def parse_args(): """ Parse arguments """ parser = argparse.ArgumentParser( description="Extract atmospheric correction \ data from ECMWF ERA5 necdf files" ) parser.add_argument( "-p", action="store_true", help="use multiprocessing", ) parser.add_argument( "-n", type=int, help="number of cores to use in parallel processing", default=1, ) args = parser.parse_args() return args def task(d: list): """ Read first element of dataset and return it as a string """ out = d[0, 0, 0, 0] return f"{str(out)}\n" def main(argv: sys.argv): """ Main execution function. """ # ===================== # Parse arguments # ===================== args = parse_args() # Profiling start = time.perf_counter() # --- # Values to be submitted to task ds = xr.load_dataset('data.nc') s = ds.t.data.shape[0] # Build list of values to be mapped to function vals = [ds.t.data for d in range(0, s)] # -------------------------------------- # --- # Actual function # TODO: get parallelism to provide speedup over the simple linear version if args.p: # Multiprocessing call to the wrapper extraction function max_workers = min(32, int(args.n) + 4) with concurrent.futures.ProcessPoolExecutor( max_workers=max_workers ) as executor: res = executor.map( task, vals, ) outtext = '' for val in res: outtext += val outfile = Path('checkres.txt') outfile.write_text(outtext) else: outtext = '' for elem in vals: outtext += task(elem) outfile = Path('checkres.txt') outfile.write_text(outtext) # --- end = time.perf_counter() print(f"Execution time: {end - start} seconds.") if __name__ == "__main__": sys.exit(main(sys.argv[1:]))
avec ds qui est le dataset netCDF lu avec xarray
Code : Sélectionner tout - Visualiser dans une fenêtre à part vals = [ds.t.data for d in range(0, s)]
ds est un xarray.core.dataset.Dataset et t est un numpy.ndarray contenant des float32
Code : Sélectionner tout - Visualiser dans une fenêtre à part ds = xr.load_dataset('data.nc')
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7 >>> ds.t.data.shape (540, 37, 21, 21) >>> type(ds.t.data) <class 'numpy.ndarray'> >>> ds.t.data[0,0,0,0] np.float32(285.86597)
et data.nc est obtenu auprès du climate data store ECMWF avec cette requête pour ceux qui seraient curieux (je ne colle pas les données ici car ce petit dataset de test fait tout de même 55Mo, ça ne passe pas en pj)
Et là, la méthode non parallélisée
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 #!/usr/bin/env python3 import cdsapi # For a standard test dataset dataset = "reanalysis-era5-pressure-levels" request = { "product_type": ["reanalysis"], "variable": [ "temperature" ], "year": [ "2014", "2015", "2016", "2017", "2018", "2019", "2020", "2021", "2022" ], "month": [ "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12" ], "day": [ "03", "09", "15", "21", "27" ], "time": ["14:00"], "pressure_level": [ "1", "2", "3", "5", "7", "10", "20", "30", "50", "70", "100", "125", "150", "175", "200", "225", "250", "300", "350", "400", "450", "500", "550", "600", "650", "700", "750", "775", "800", "825", "850", "875", "900", "925", "950", "975", "1000" ], "data_format": "netcdf", "download_format": "unarchived", "area": [46.5, 1, 41.5, 6] } client = cdsapi.Client() client.retrieve(dataset, request, target='data.nc')
est plus rapide que la version parallélisée
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3 for elem in vals: outtext += task(elem)
Là il y a un truc qui ne va vraiment pas et j'ai beau chercher je n'arrive pas à comprendre, sauf que je me doute qu'il y a un truc pas net au niveau de la construction de la liste des paramètres qui est passée au map.
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5 $ ./ppp.py -n 8 Execution time: 1.4252492609994079 seconds. $ ./ppp.py -n 8 -p Execution time: 85.55303567800001 seconds.
Contrairement au cas test du premier code où quand on le fait tourner en parallèle on voit bien tous les coeurs d'exécution être chargés à 100%, dans le second code on voit que c'est limité à l'équivalent d'un seul coeur d'exécution à 100%, réparti sur le nombre de coeurs sélectionnées par le paramètre n. On dirait que c'est comme si le dataset généré par ds.t.data était le même pour toutes les instances et qu'il était bloqué à une lecture unique à la fois par une sorte de GIL (qui ne devrait pas jouer ici car c'est des Process, pas des Thread mais bon, c'est l'idée).
J'ai essayé tout ce qui m'est venu à l'esprit pour construire la liste des paramètres "vals", allant de la simple affectation du dataset déjà lu dans une variable jusquà carrément de relire le fichier netCDF pour chaque élément de la liste, ce qui ralentit sacrément le code mais n'améliore pas la performance de la version parallélisée versus le séquentiel, rien n'y fait
J'ai remarqué que toutes ces méthodes n'entraînait pas une augmentation significative de la mémoire, ça donne vraiment l'impression que python reconnaît qu'il s'agit de n fois des mêmes données et fait référence n fois à une même zone mémoire.
Du coup j'ai pris une méthode radicale, lecture du dataset dans une variable puis un copy.deepcopy pour chaque instance de la liste
avec cette méthode, je vois bien la consommation mémoire exploser au début de l'exécution, correspondant donc bien aux n deepcopy de la construction de la liste mais toujours le même ratio de vitesse d'exécution entre le séquentiel et le parallèle avec le parallèle environ 40 fois plus lent que le séquentiel pour exactement le même code !
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5 ds = xr.load_dataset('data.nc') t = ds.t.data s = ds.t.data.shape[0] vals = [copy.deepcopy(t) for d in range(0, s)]Là ce n'est pas une question de GIL ou autre lock parce que c'est la même instance de la variable passée par "pseudo"-référence mais alors c'est quoi qui bloque ?
Il y a quelque chose qui fait que si je passe une liste de "petites" valeurs simple comme paramètres au map de ProcessPoolExecutor il parallélise réellement alors que si c'est une "grosse" (pas gigantesque non plus) structure de données de type array numpy ça casse tout. J'ai même essayé en transformant l'array numpy en liste, ayant lu que ProcessPoolExecutor requiert des objets picklable et qu'il y aurait des soucis avec les ndarray mais ça n'a rien changé.
Est-ce que c'est sans espoir ou est-ce qu'il y a moyen de paralléliser l'extraction de données depuis des arrays numpy ?
Merci !![]()
Partager