IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

Python Discussion :

Parallelisation effective avec concurrent.futures.ProcessPoolExecutor dans certains cas


Sujet :

Python

  1. #1
    Membre averti
    Homme Profil pro
    Inscrit en
    Septembre 2011
    Messages
    16
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France

    Informations forums :
    Inscription : Septembre 2011
    Messages : 16
    Par défaut Parallelisation effective avec concurrent.futures.ProcessPoolExecutor dans certains cas
    Bonjour,

    Je veux paralléliser un code d'extraction de données depuis un dataset netCDF et j'obtiens une performance moins bonne que si je ne parallélise pas.

    Ce n'est pas un problème de ProcessPoolExecutor à proprement parler car avec un cas test j'obtiens bien une accélération lorsque c'est parallélisé. Voici le code (attention, c'est basique, pas beau et tout mais c'est juste un test pour valider le principe) de mon cas test :

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
     
    #!/usr/bin/env python3
    # coding: utf-8
    ###############################################################################
    """\
    ppp.py
    ----------------------
    Python Parallelisation Performance
    Compute sum of prime numbers in a range.
     
    usage:
        ppp.py [-p] [-n <nb_cores>]
     
    options:
        -n <nb_cores>  Use <nb_cores> local cores to extract data
            Default: 1
        -p  Turn on parallelism.
     
    """
     
    import sys
    import argparse
    import xarray as xr
    import concurrent.futures
    import time
     
     
    def parse_args():
        """
        Parse arguments
        """
        parser = argparse.ArgumentParser(
            description="Compute sum of prime \
                numbers in a range"
            )
        parser.add_argument(
            "-p",
            action="store_true",
            help="use multiprocessing",
        )
        parser.add_argument(
            "-n",
            type=int,
            help="number of cores to use in parallel processing",
            default=1,
        )
        args = parser.parse_args()
        return args
     
     
    def task(nb: int) -> int:
        out = 0
        if nb < 1:
            print("Cannot compute primes of numbers less than 1")
            sys.exit(-1)
        for i in range(1, nb):
            if (nb % i) == 0:
                out += i
        return out
     
     
    def main(argv: sys.argv):
        """
        Main execution function.
        """
        # =====================
        # Parse arguments
        # =====================
        args = parse_args()
        # Profiling
        start = time.perf_counter()
        # ---
        # Values to be submitted to task
        vals = [10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000, 10000000]
        # ---
        # Actual function
        # TODO: get parallelism to provide speedup over the simple linear version
        if args.p:
            # Multiprocessing call to the wrapper extraction function
            max_workers = min(32, int(args.n) + 4)
            with concurrent.futures.ProcessPoolExecutor(
                max_workers=max_workers
            ) as executor:
                res = executor.map(
                    task,
                    vals,
                )
            for elem in res:
                print(elem)
        else:
            for elem in vals:
                res = task(elem)
                print(res)
        # ---
        end = time.perf_counter()
        print(f"Execution time: {end - start} seconds.")
     
     
    if __name__ == "__main__":
        sys.exit(main(sys.argv[1:]))
    On a un gain pas fou mais réel quand on passe en parallélisé :

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
     
    $ ./ppp.py -n 8
    ...
    Execution time: 16.775680779999675 seconds.
    $ ./ppp.py -n 8 -p
    ...
    Execution time: 4.326744636000512 seconds.
    Je passe une bête liste
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    vals = [10000000, 10000000, ...]
    avec toujours la même valeur au map, ça ça fonctionne.

    Par contre, quand je reprends ce code et que je l'adapte à ce que je veux faire (lire un dataset netCDF et extraire les données contenues dedans vers des fichiers texte) le comportement est inverse avec une exécution parallèle environ 40 fois plus lente que l'exécution séquentielle !

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
     
    #!/usr/bin/env python3
    # coding: utf-8
    ###############################################################################
    """\
    ppp.py
    ----------------------
    Python Parallelisation Performance
    Extract information from ECMWF ERA5 model data for use in atmospheric delay correction
    computation.
     
    usage:
        ppp.py [-p] [-n <nb_cores>]
     
    options:
        -n <nb_cores>  Use <nb_cores> local cores to extract data
            Default: 1
        -p  Turn on parallelism.
     
    """
     
    import sys
    import argparse
    import xarray as xr
    from pathlib import Path
    import concurrent.futures
    import time
     
     
    def parse_args():
        """
        Parse arguments
        """
        parser = argparse.ArgumentParser(
            description="Extract atmospheric correction \
                data from ECMWF ERA5 necdf files"
            )
        parser.add_argument(
            "-p",
            action="store_true",
            help="use multiprocessing",
        )
        parser.add_argument(
            "-n",
            type=int,
            help="number of cores to use in parallel processing",
            default=1,
        )
        args = parser.parse_args()
        return args
     
     
    def task(d: list):
        """
        Read first element of dataset and return it as a string
        """
        out = d[0, 0, 0, 0]
        return f"{str(out)}\n"
     
     
    def main(argv: sys.argv):
        """
        Main execution function.
        """
        # =====================
        # Parse arguments
        # =====================
        args = parse_args()
        # Profiling
        start = time.perf_counter()
        # ---
        # Values to be submitted to task
        ds = xr.load_dataset('data.nc')
        s = ds.t.data.shape[0]
        # Build list of values to be mapped to function
        vals = [ds.t.data for d in range(0, s)]
        # --------------------------------------
        # ---
        # Actual function
        # TODO: get parallelism to provide speedup over the simple linear version
        if args.p:
            # Multiprocessing call to the wrapper extraction function
            max_workers = min(32, int(args.n) + 4)
            with concurrent.futures.ProcessPoolExecutor(
                max_workers=max_workers
            ) as executor:
                res = executor.map(
                    task,
                    vals,
                )
            outtext = ''
            for val in res:
                outtext += val
            outfile = Path('checkres.txt')
            outfile.write_text(outtext)
        else:
            outtext = ''
            for elem in vals:
                outtext += task(elem)
            outfile = Path('checkres.txt')
            outfile.write_text(outtext)
        # ---
        end = time.perf_counter()
        print(f"Execution time: {end - start} seconds.")
     
     
    if __name__ == "__main__":
        sys.exit(main(sys.argv[1:]))
    La seule chose qui change c'est que vals est maintenant

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    vals = [ds.t.data for d in range(0, s)]
    avec ds qui est le dataset netCDF lu avec xarray

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    ds = xr.load_dataset('data.nc')
    ds est un xarray.core.dataset.Dataset et t est un numpy.ndarray contenant des float32

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
     
    >>> ds.t.data.shape
    (540, 37, 21, 21)
    >>> type(ds.t.data)
    <class 'numpy.ndarray'>
    >>> ds.t.data[0,0,0,0]
    np.float32(285.86597)

    et data.nc est obtenu auprès du climate data store ECMWF avec cette requête pour ceux qui seraient curieux (je ne colle pas les données ici car ce petit dataset de test fait tout de même 55Mo, ça ne passe pas en pj)

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
     
    #!/usr/bin/env python3
     
    import cdsapi
     
    # For a standard test dataset
    dataset = "reanalysis-era5-pressure-levels"
    request = {
        "product_type": ["reanalysis"],
        "variable": [
            "temperature"
        ],
        "year": [
            "2014", "2015", "2016",
            "2017", "2018", "2019",
            "2020", "2021", "2022"
        ],
        "month": [
            "01", "02", "03",
            "04", "05", "06",
            "07", "08", "09",
            "10", "11", "12"
        ],
        "day": [
            "03", "09", "15",
            "21", "27"
        ],
        "time": ["14:00"],
        "pressure_level": [
            "1", "2", "3",
            "5", "7", "10",
            "20", "30", "50",
            "70", "100", "125",
            "150", "175", "200",
            "225", "250", "300",
            "350", "400", "450",
            "500", "550", "600",
            "650", "700", "750",
            "775", "800", "825",
            "850", "875", "900",
            "925", "950", "975",
            "1000"
        ],
        "data_format": "netcdf",
        "download_format": "unarchived",
        "area": [46.5, 1, 41.5, 6]
    }
     
    client = cdsapi.Client()
    client.retrieve(dataset, request, target='data.nc')
    Et là, la méthode non parallélisée
    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
     
            for elem in vals:
                outtext += task(elem)
    est plus rapide que la version parallélisée

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
     
    $ ./ppp.py -n 8
    Execution time: 1.4252492609994079 seconds.
    $ ./ppp.py -n 8 -p
    Execution time: 85.55303567800001 seconds.
    Là il y a un truc qui ne va vraiment pas et j'ai beau chercher je n'arrive pas à comprendre, sauf que je me doute qu'il y a un truc pas net au niveau de la construction de la liste des paramètres qui est passée au map.

    Contrairement au cas test du premier code où quand on le fait tourner en parallèle on voit bien tous les coeurs d'exécution être chargés à 100%, dans le second code on voit que c'est limité à l'équivalent d'un seul coeur d'exécution à 100%, réparti sur le nombre de coeurs sélectionnées par le paramètre n. On dirait que c'est comme si le dataset généré par ds.t.data était le même pour toutes les instances et qu'il était bloqué à une lecture unique à la fois par une sorte de GIL (qui ne devrait pas jouer ici car c'est des Process, pas des Thread mais bon, c'est l'idée).

    J'ai essayé tout ce qui m'est venu à l'esprit pour construire la liste des paramètres "vals", allant de la simple affectation du dataset déjà lu dans une variable jusquà carrément de relire le fichier netCDF pour chaque élément de la liste, ce qui ralentit sacrément le code mais n'améliore pas la performance de la version parallélisée versus le séquentiel, rien n'y fait

    J'ai remarqué que toutes ces méthodes n'entraînait pas une augmentation significative de la mémoire, ça donne vraiment l'impression que python reconnaît qu'il s'agit de n fois des mêmes données et fait référence n fois à une même zone mémoire.

    Du coup j'ai pris une méthode radicale, lecture du dataset dans une variable puis un copy.deepcopy pour chaque instance de la liste

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
     
        ds = xr.load_dataset('data.nc')
        t = ds.t.data
        s = ds.t.data.shape[0]
        vals = [copy.deepcopy(t) for d in range(0, s)]
    avec cette méthode, je vois bien la consommation mémoire exploser au début de l'exécution, correspondant donc bien aux n deepcopy de la construction de la liste mais toujours le même ratio de vitesse d'exécution entre le séquentiel et le parallèle avec le parallèle environ 40 fois plus lent que le séquentiel pour exactement le même code ! Là ce n'est pas une question de GIL ou autre lock parce que c'est la même instance de la variable passée par "pseudo"-référence mais alors c'est quoi qui bloque ?

    Il y a quelque chose qui fait que si je passe une liste de "petites" valeurs simple comme paramètres au map de ProcessPoolExecutor il parallélise réellement alors que si c'est une "grosse" (pas gigantesque non plus) structure de données de type array numpy ça casse tout. J'ai même essayé en transformant l'array numpy en liste, ayant lu que ProcessPoolExecutor requiert des objets picklable et qu'il y aurait des soucis avec les ndarray mais ça n'a rien changé.

    Est-ce que c'est sans espoir ou est-ce qu'il y a moyen de paralléliser l'extraction de données depuis des arrays numpy ?

    Merci !

  2. #2
    Expert éminent
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 577
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 577
    Par défaut
    Citation Envoyé par DrWaste Voir le message
    J'ai remarqué que toutes ces méthodes n'entraînait pas une augmentation significative de la mémoire, ça donne vraiment l'impression que python reconnaît qu'il s'agit de n fois des mêmes données et fait référence n fois à une même zone mémoire.
    Vous travaillez sur Linux, par défaut, les process sont crées via "fork" et la mémoire du parent ne sera recopiée que lors des écritures. Python n'y est pour rien.

    Citation Envoyé par DrWaste Voir le message
    Est-ce que c'est sans espoir ou est-ce qu'il y a moyen de paralléliser l'extraction de données depuis des arrays numpy ?
    Ce qu'on peut faire avec numpy est d'utiliser la mémoire partagée (du système) comme illustré ici.

    Pour le reste, paralléliser, c'est du boulot... car ce sont beaucoup de cas particuliers à résoudre.

    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

Discussions similaires

  1. Réponses: 18
    Dernier message: 26/06/2011, 16h21
  2. Empecher la validation dans certains cas ?
    Par elitost dans le forum Struts 1
    Réponses: 2
    Dernier message: 19/09/2006, 14h03
  3. Passer la validation Javascript dans certains cas
    Par tkino dans le forum Struts 1
    Réponses: 2
    Dernier message: 31/07/2006, 16h37
  4. [JOINTURES] interêt dans certain cas
    Par nicotine002 dans le forum Langage SQL
    Réponses: 4
    Dernier message: 13/06/2006, 18h22
  5. [POO] Modèle objet: this inutilisable dans certains cas?
    Par vlord dans le forum Général JavaScript
    Réponses: 13
    Dernier message: 13/08/2005, 11h41

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo