Bonjour,

Je veux paralléliser un code d'extraction de données depuis un dataset netCDF et j'obtiens une performance moins bonne que si je ne parallélise pas.

Ce n'est pas un problème de ProcessPoolExecutor à proprement parler car avec un cas test j'obtiens bien une accélération lorsque c'est parallélisé. Voici le code (attention, c'est basique, pas beau et tout mais c'est juste un test pour valider le principe) de mon cas test :

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
 
#!/usr/bin/env python3
# coding: utf-8
###############################################################################
"""\
ppp.py
----------------------
Python Parallelisation Performance
Compute sum of prime numbers in a range.
 
usage:
    ppp.py [-p] [-n <nb_cores>]
 
options:
    -n <nb_cores>  Use <nb_cores> local cores to extract data
        Default: 1
    -p  Turn on parallelism.
 
"""
 
import sys
import argparse
import xarray as xr
import concurrent.futures
import time
 
 
def parse_args():
    """
    Parse arguments
    """
    parser = argparse.ArgumentParser(
        description="Compute sum of prime \
            numbers in a range"
        )
    parser.add_argument(
        "-p",
        action="store_true",
        help="use multiprocessing",
    )
    parser.add_argument(
        "-n",
        type=int,
        help="number of cores to use in parallel processing",
        default=1,
    )
    args = parser.parse_args()
    return args
 
 
def task(nb: int) -> int:
    out = 0
    if nb < 1:
        print("Cannot compute primes of numbers less than 1")
        sys.exit(-1)
    for i in range(1, nb):
        if (nb % i) == 0:
            out += i
    return out
 
 
def main(argv: sys.argv):
    """
    Main execution function.
    """
    # =====================
    # Parse arguments
    # =====================
    args = parse_args()
    # Profiling
    start = time.perf_counter()
    # ---
    # Values to be submitted to task
    vals = [10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000]
    # ---
    # Actual function
    # TODO: get parallelism to provide speedup over the simple linear version
    if args.p:
        # Multiprocessing call to the wrapper extraction function
        max_workers = min(32, int(args.n) + 4)
        with concurrent.futures.ProcessPoolExecutor(
            max_workers=max_workers
        ) as executor:
            res = executor.map(
                task,
                vals,
            )
        for elem in res:
            print(elem)
    else:
        for elem in vals:
            res = task(elem)
            print(res)
    # ---
    end = time.perf_counter()
    print(f"Execution time: {end - start} seconds.")
 
 
if __name__ == "__main__":
    sys.exit(main(sys.argv[1:]))
On a un gain pas fou mais réel quand on passe en parallélisé :

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
 
$ ./ppp.py -n 8
...
Execution time: 16.775680779999675 seconds.
$ ./ppp.py -n 8 -p
...
Execution time: 4.326744636000512 seconds.
Je passe une bête liste
Code : Sélectionner tout - Visualiser dans une fenêtre à part
vals = [10000000, 10000000, ...]
avec toujours la même valeur au map, ça ça fonctionne.

Par contre, quand je reprends ce code et que je l'adapte à ce que je veux faire (lire un dataset netCDF et extraire les données contenues dedans vers des fichiers texte) le comportement est inverse avec une exécution parallèle environ 40 fois plus lente que l'exécution séquentielle !

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
 
#!/usr/bin/env python3
# coding: utf-8
###############################################################################
"""\
ppp.py
----------------------
Python Parallelisation Performance
Extract information from ECMWF ERA5 model data for use in atmospheric delay correction
computation.
 
usage:
    ppp.py [-p] [-n <nb_cores>]
 
options:
    -n <nb_cores>  Use <nb_cores> local cores to extract data
        Default: 1
    -p  Turn on parallelism.
 
"""
 
import sys
import argparse
import xarray as xr
from pathlib import Path
import concurrent.futures
import time
 
 
def parse_args():
    """
    Parse arguments
    """
    parser = argparse.ArgumentParser(
        description="Extract atmospheric correction \
            data from ECMWF ERA5 necdf files"
        )
    parser.add_argument(
        "-p",
        action="store_true",
        help="use multiprocessing",
    )
    parser.add_argument(
        "-n",
        type=int,
        help="number of cores to use in parallel processing",
        default=1,
    )
    args = parser.parse_args()
    return args
 
 
def task(d: list):
    """
    Read first element of dataset and return it as a string
    """
    out = d[0, 0, 0, 0]
    return f"{str(out)}\n"
 
 
def main(argv: sys.argv):
    """
    Main execution function.
    """
    # =====================
    # Parse arguments
    # =====================
    args = parse_args()
    # Profiling
    start = time.perf_counter()
    # ---
    # Values to be submitted to task
    ds = xr.load_dataset('data.nc')
    s = ds.t.data.shape[0]
    # Build list of values to be mapped to function
    vals = [ds.t.data for d in range(0, s)]
    # --------------------------------------
    # ---
    # Actual function
    # TODO: get parallelism to provide speedup over the simple linear version
    if args.p:
        # Multiprocessing call to the wrapper extraction function
        max_workers = min(32, int(args.n) + 4)
        with concurrent.futures.ProcessPoolExecutor(
            max_workers=max_workers
        ) as executor:
            res = executor.map(
                task,
                vals,
            )
        outtext = ''
        for val in res:
            outtext += val
        outfile = Path('checkres.txt')
        outfile.write_text(outtext)
    else:
        outtext = ''
        for elem in vals:
            outtext += task(elem)
        outfile = Path('checkres.txt')
        outfile.write_text(outtext)
    # ---
    end = time.perf_counter()
    print(f"Execution time: {end - start} seconds.")
 
 
if __name__ == "__main__":
    sys.exit(main(sys.argv[1:]))
La seule chose qui change c'est que vals est maintenant

Code : Sélectionner tout - Visualiser dans une fenêtre à part
vals = [ds.t.data for d in range(0, s)]
avec ds qui est le dataset netCDF lu avec xarray

Code : Sélectionner tout - Visualiser dans une fenêtre à part
ds = xr.load_dataset('data.nc')
ds est un xarray.core.dataset.Dataset et t est un numpy.ndarray contenant des float32

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
 
>>> ds.t.data.shape
(540, 37, 21, 21)
>>> type(ds.t.data)
<class 'numpy.ndarray'>
>>> ds.t.data[0,0,0,0]
np.float32(285.86597)

et data.nc est obtenu auprès du climate data store ECMWF avec cette requête pour ceux qui seraient curieux (je ne colle pas les données ici car ce petit dataset de test fait tout de même 55Mo, ça ne passe pas en pj)

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 
#!/usr/bin/env python3
 
import cdsapi
 
# For a standard test dataset
dataset = "reanalysis-era5-pressure-levels"
request = {
    "product_type": ["reanalysis"],
    "variable": [
        "temperature"
    ],
    "year": [
        "2014", "2015", "2016",
        "2017", "2018", "2019",
        "2020", "2021", "2022"
    ],
    "month": [
        "01", "02", "03",
        "04", "05", "06",
        "07", "08", "09",
        "10", "11", "12"
    ],
    "day": [
        "03", "09", "15",
        "21", "27"
    ],
    "time": ["14:00"],
    "pressure_level": [
        "1", "2", "3",
        "5", "7", "10",
        "20", "30", "50",
        "70", "100", "125",
        "150", "175", "200",
        "225", "250", "300",
        "350", "400", "450",
        "500", "550", "600",
        "650", "700", "750",
        "775", "800", "825",
        "850", "875", "900",
        "925", "950", "975",
        "1000"
    ],
    "data_format": "netcdf",
    "download_format": "unarchived",
    "area": [46.5, 1, 41.5, 6]
}
 
client = cdsapi.Client()
client.retrieve(dataset, request, target='data.nc')
Et là, la méthode non parallélisée
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
 
        for elem in vals:
            outtext += task(elem)
est plus rapide que la version parallélisée

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
 
$ ./ppp.py -n 8
Execution time: 1.4252492609994079 seconds.
$ ./ppp.py -n 8 -p
Execution time: 85.55303567800001 seconds.
Là il y a un truc qui ne va vraiment pas et j'ai beau chercher je n'arrive pas à comprendre, sauf que je me doute qu'il y a un truc pas net au niveau de la construction de la liste des paramètres qui est passée au map.

Contrairement au cas test du premier code où quand on le fait tourner en parallèle on voit bien tous les coeurs d'exécution être chargés à 100%, dans le second code on voit que c'est limité à l'équivalent d'un seul coeur d'exécution à 100%, réparti sur le nombre de coeurs sélectionnées par le paramètre n. On dirait que c'est comme si le dataset généré par ds.t.data était le même pour toutes les instances et qu'il était bloqué à une lecture unique à la fois par une sorte de GIL (qui ne devrait pas jouer ici car c'est des Process, pas des Thread mais bon, c'est l'idée).

J'ai essayé tout ce qui m'est venu à l'esprit pour construire la liste des paramètres "vals", allant de la simple affectation du dataset déjà lu dans une variable jusquà carrément de relire le fichier netCDF pour chaque élément de la liste, ce qui ralentit sacrément le code mais n'améliore pas la performance de la version parallélisée versus le séquentiel, rien n'y fait

J'ai remarqué que toutes ces méthodes n'entraînait pas une augmentation significative de la mémoire, ça donne vraiment l'impression que python reconnaît qu'il s'agit de n fois des mêmes données et fait référence n fois à une même zone mémoire.

Du coup j'ai pris une méthode radicale, lecture du dataset dans une variable puis un copy.deepcopy pour chaque instance de la liste

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
 
    ds = xr.load_dataset('data.nc')
    t = ds.t.data
    s = ds.t.data.shape[0]
    vals = [copy.deepcopy(t) for d in range(0, s)]
avec cette méthode, je vois bien la consommation mémoire exploser au début de l'exécution, correspondant donc bien aux n deepcopy de la construction de la liste mais toujours le même ratio de vitesse d'exécution entre le séquentiel et le parallèle avec le parallèle environ 40 fois plus lent que le séquentiel pour exactement le même code ! Là ce n'est pas une question de GIL ou autre lock parce que c'est la même instance de la variable passée par "pseudo"-référence mais alors c'est quoi qui bloque ?

Il y a quelque chose qui fait que si je passe une liste de "petites" valeurs simple comme paramètres au map de ProcessPoolExecutor il parallélise réellement alors que si c'est une "grosse" (pas gigantesque non plus) structure de données de type array numpy ça casse tout. J'ai même essayé en transformant l'array numpy en liste, ayant lu que ProcessPoolExecutor requiert des objets picklable et qu'il y aurait des soucis avec les ndarray mais ça n'a rien changé.

Est-ce que c'est sans espoir ou est-ce qu'il y a moyen de paralléliser l'extraction de données depuis des arrays numpy ?

Merci !