IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

Python Discussion :

Problématique du partage de données dans un contexte de multiprocessing


Sujet :

Python

Vue hybride

Message précédent Message précédent   Message suivant Message suivant
  1. #1
    Candidat au Club
    Homme Profil pro
    sigiste
    Inscrit en
    Octobre 2024
    Messages
    3
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Seine Saint Denis (Île de France)

    Informations professionnelles :
    Activité : sigiste

    Informations forums :
    Inscription : Octobre 2024
    Messages : 3
    Par défaut Problématique du partage de données dans un contexte de multiprocessing
    Bonjour,

    Je suis un peu novice sur l utilisation de python et des multi process, je rencontre quelques problèmes pour les variables.

    Dans mon script Python (calcul de basqsin versant), j'ai une fonction appelée initializer_worker() qui est responsable de réaliser des calculs pour obtenir deux variables issues de raster : les directions de flux (fdir) et l'accumulation de flux (acc).
    Ces variables doivent être calculées une seule fois au début du processus global et ensuite utilisées dans plusieurs processus enfants pour effectuer des calculs de la délimitation de bassins versants Ã* partir de coordonnées xy des stations.

    J'ai énormément de paire xy(definit une stations) donc j'ai initié :

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    chunks = [stations_df.iloc[i:i + 200] for i in range(0, len(stations_df), 200)]
    pour ne pas tt calculé d"un seul coup

    si j 'initialise la fonction dans le ProcessPoolExecutor

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
       with ProcessPoolExecutor(max_workers=8, initializer=initializer_worker, initargs=(elevation,)) as executor:
            futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
    J'ai bien mes calculs mais par contre initializer_worker est lancé plusieurs fois et c'est ce qui demande le plus de temps dans le traitement

    et si j'essaye de lui donner ttes le variables de cette fonction

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
          with ProcessPoolExecutor(max_workers=7) as executor:
            futures = {executor.submit(process_chunk, dirmap, fdir, acc, chunk): chunk for chunk in chunks}
    j ai des retour d erreur 'NoneType' object has no attribute 'mask' donc les variables ne sont pas transmises.

    J insérée mon code ici et toute aide, lien est la bienvenue ( désolé si il y a de choses grossière en python...)


    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
     
    import time
    import os
    import pandas as pd
    import geopandas as gpd
    from shapely import geometry, ops
    import rasterio
    from pysheds.grid import Grid
    from concurrent.futures import ProcessPoolExecutor, as_completed
    from export_logging import create_logger
     
     
     
     
    #répertoires et du logger
    dossier_script = os.path.dirname(os.path.abspath(__file__))
    dossier_log = os.path.join(dossier_script, 'log')
    os.makedirs(dossier_log, exist_ok=True)
    log = create_logger(os.path.join(dossier_log, 'transfert_process.log'))
    elevation = r'....fr_75m_wgs84.tif'
     
    stations = pd.read_csv(
        r'....stations.csv',
        sep=';')
     
    def lecture_x_y(listexy):
        '''lecture de coordonnees geo (xy)'''
        stations_df = listexy[['pop_id', 'x', 'y']].drop_duplicates()
        stations_df = stations_df.head(200)
     
        print(time.strftime('%H:%M:%S'), 'lecture stations')
     
        return stations_df
     
    def initializer_worker(elevation):
     
        #global grid, dem, fdir, acc
        grid = Grid.from_raster(elevation)
        dem = grid.read_raster(elevation)
        pit_filled_dem = grid.fill_pits(dem)
        flooded_dem = grid.fill_depressions(pit_filled_dem)
        inflated_dem = grid.resolve_flats(flooded_dem)
        dirmap = (64, 128, 1, 2, 4, 8, 16, 32)
        fdir = grid.flowdir(inflated_dem, dirmap=dirmap)
        acc = grid.accumulation(fdir, dirmap=dirmap)
        if acc is None:
            print("Erreur : 'acc' est None après calcul")
        print("process initialisation des parametre de direction et d'accumulation des flux... ok")
        return dirmap, fdir, acc
     
     
    def delineate_catchment(x, y, station_id, acc, dirmap, fdir):
     
        """ fonction de délimitation des bassins versants """
        log_file = os.path.join(dossier_log, f'log_station_{station_id}.log')
        log = create_logger(log_file)
        grid = Grid.from_raster(elevation)
        try:
            with rasterio.Env():
                # on permet au bassin versant de coller au xy
                x_snap, y_snap = grid.snap_to_mask(acc > 1200, (x, y))
     
                # on vérifie que la station est Ã* l'intérieur du raster
                if x_snap is None or y_snap is None:
                    log.error(f"Station {station_id} en dehors de l'emprise du raster.")
                    print(f"Station {station_id} en dehors de l'emprise du raster.")
                    return None, station_id
                # print ('stations ok')
     
                # calcul du bassin
                catch = grid.catchment(x=x_snap, y=y_snap, fdir=fdir, dirmap=dirmap, xytype='coordinate')
                # on decoupe
                grid.clip_to(catch)
                # on trasforme en polygones
                shapes = grid.polygonize()
     
                # vérifiesi le bassin versant est vide
                if not shapes:
                    print(f"Bassin versant vide ou trop petit pour la station {station_id}")
                    log.warning(f"Bassin versant vide ou trop petit pour la station {station_id}")
                    return None, station_id
                # print('shape ok')
                catchment_polygon = ops.unary_union([geometry.shape(shape) for shape, value in shapes])
     
                # Vérifie si le polygone retourné est vide
                if catchment_polygon.is_empty:
                    print(f"Bassin versant trop petit pour la station {station_id}")
                    log.warning(f"Bassin versant trop petit pour la station {station_id}")
                    return None, station_id
     
                log.info(f"Bassin versant délimité pour la station {station_id}")
                return catchment_polygon, station_id
        except Exception as e:
            log.error(f"Erreur pour la station {station_id}: {e}")
            import traceback, sys
            tb = sys.exc_info()[2]
            print("erreur ligne {0} -  {1} \n".format(tb.tb_lineno, e))
            return None, station_id
    #
    def process_chunk(dirmap, fdir, acc, chunk):
     
        """Traitement par lots de stations"""
     
        results = []
        for row in chunk.itertuples():
            try:
                result = delineate_catchment(row.x, row.y, row.pop_id,acc, dirmap, fdir)
                if result[0] is not None:
                    results.append(result)
            except Exception as e:
                log.error(f"Erreur lors du traitement de la station {row.pop_id}: {e}")
        return results
     
     
    if __name__ == '__main__':
        start = time.time()
     
     
        log.info("Début")
        print(time.strftime('%H:%M:%S'), 'Début')
        dirmap, fdir, acc = initializer_worker(elevation)
     
        stations_df = lecture_x_y(stations)
        # diviser les stations en morceaux pour le traitement par lots
        chunks = [stations_df.iloc[i:i + 200] for i in range(0, len(stations_df), 200)]
     
        all_catchment_polygons = []
        all_station_ids = []
     
     
        print(time.strftime('%H:%M:%S'), 'envoi')
     
        with ProcessPoolExecutor(max_workers=7) as executor:
            futures = {executor.submit(process_chunk, dirmap, fdir, acc, chunk): chunk for chunk in chunks}
     
            for future in as_completed(futures):
                chunk_index = list(futures.keys()).index(future)
                try:
                    result = future.result()
                    if result:
                        for catchment_polygon, station_id in result:
                            all_catchment_polygons.append(catchment_polygon)
                            all_station_ids.append(station_id)
                    print(f"Chunk {chunk_index + 1}/{len(chunks)} processed")
                except Exception as e:
                    print(f"Error processing chunk {chunk_index + 1}: {e}")
     
     
        gdf = gpd.GeoDataFrame({'pop_id': all_station_ids, 'geometry': all_catchment_polygons}, crs='EPSG:4326')
        print(time.strftime('%H:%M:%S', time.gmtime(time.time() - start)))
     
        dossier_resultat = os.path.join(dossier_script, 'resultat')
        os.makedirs(dossier_resultat, exist_ok=True)
        output_shapefile = os.path.join(dossier_script, 'resultat/bv_75m.shp')
        gdf.to_file(output_shapefile)
        log.info('Fin')
        print(time.strftime('%H:%M:%S'), 'fin')

  2. #2
    Expert éminent
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 754
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 754
    Par défaut
    Citation Envoyé par bern111 Voir le message
    j ai des retour d erreur 'NoneType' object has no attribute 'mask' donc les variables ne sont pas transmises.
    Il s'agit d'erreur lorsque l'objet correspondant (à la variable) n'est pas du type attendu par la fonction appelée. Et effectivement l'objet None n'a pas d'attribut "mask" mais si l'appelant transmet None ou que l'appelée parcours un réticule où il récupère des None là où il devrait trouver d'autres objets... Ca plante.

    Ceci dit, ce n'est pas de la programmation pour débutant et pas grand monde aura le temps de décortiquer ce que vous avez fait pour trouver l'erreur d'autant que vous vous plaignez des erreurs sans trop insister sur le plus important: ce code est supposé accomplir (l'objectif à atteindre).

    Si vous débutez, vous ne savez pas trop programmer, difficile de croire que vous avez mis les bons moyens en face. Il faudra sans doute revoir la conception... ce qui aboutira à écrire un autre code qui pourra être fonctionnel sans avoir eu à résoudre/répondre les problèmes/aux questions posées.

    Mais vous n'êtes pas à l'abri d'une bonne surprise: il vous suffit d'attendre... Et lorsque/si vous en aurez/avez marre (d'attendre), il vous faudra fournir un code plus simple qui permettra de reproduire les soucis rencontrés sans trop de difficultés (ce qui est le b.a.ba lorsqu'on demande de l'aide).

    Bon courage,

    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

  3. #3
    Candidat au Club
    Homme Profil pro
    sigiste
    Inscrit en
    Octobre 2024
    Messages
    3
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Seine Saint Denis (Île de France)

    Informations professionnelles :
    Activité : sigiste

    Informations forums :
    Inscription : Octobre 2024
    Messages : 3
    Par défaut
    ok, je vois .. j aurais dû être plus précis.

    mon erreur est sur la ligne 66 de mon code

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    x_snap, y_snap = grid.snap_to_mask(acc > 1200, (x, y))
    'NoneType' object has no attribute 'mask'

    Pourtant le grid est bien remplit et n'est donc pas NoneType et il contient l'attribut mask.


    C'est l'utilisation de with ProcessPoolExecutor qui me pose problème. Lorsque j 'initialise la fonction dans le process pool, je n'ai pas l 'erreur...

    Mais avec futures = {executor.submit(process_chunk, dirmap, fdir, acc, chunk): chunk for chunk in chunks}, j obtiens cet erreur que je n'arrive pas à comprendre et c'était donc à ce niveau que je demandais de l' aide.



    J'ai finalement opté pour lancé tous les points xy sans faire de paquet de 200 et modifier ces lignes dans le code

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
     
    #chunks = [stations_df.iloc[i:i + 200] for i in range(0, len(stations_df), 200)]
     
        chunks = stations_df
     
     
        print(time.strftime('%H:%M:%S'), 'envoi')
     
        with ProcessPoolExecutor(max_workers=7, initializer=initializer_worker, initargs=(elevation,)) as executor:
            futures = {executor.submit(process_chunk, chunks)}

    Le traitement prend du temps (15 000 points >>8h de calcul ) mais j ai encore de la marge sur les max_workers et je peux surement mieux faire pour prendre moins de mémoire surtout sur le découpage du grip, et de peut être transformer au préalable en tableau numpy
    ces deux lignes .
    *
    fdir = grid.flowdir(inflated_dem, dirmap=dirmap)
    acc = grid.accumulation(fdir, dirmap=dirmap)


    8h de calcul pour un raster 75par 75m et l objectif est au moins 25m par 25m pour lequel je manque de mémoire pour l instant.

    (pas totalement novice mais conscient d'avoir plein de choses à découvrir)

  4. #4
    Expert éminent
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 754
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 754
    Par défaut
    Salut,

    Citation Envoyé par bern111 Voir le message
    ok, je vois .. j aurais dû être plus précis.

    mon erreur est sur la ligne 66 de mon code

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    x_snap, y_snap = grid.snap_to_mask(acc > 1200, (x, y))
    'NoneType' object has no attribute 'mask'
    Est ce que l'attribut mask est accédé à cette ligne? Ce n'est pas "grid" qui est à None... et vous ne montrez qu'un bout du message d'erreur/exception de python.

    Vous racontez une histoire qu'on ne peut ni vérifier, ni reproduire,...

    Citation Envoyé par bern111 Voir le message
    Mais avec futures = {executor.submit(process_chunk, dirmap, fdir, acc, chunk): chunk for chunk in chunks}, j obtiens cet erreur que je n'arrive pas à comprendre et c'était donc à ce niveau que je demandais de l' aide.
    Cette ligne ne provoque pas d'erreur par contre vous êtes convaincu que c'est ce changement dans votre code qui... conviction qui vient d'une expérience que vous avez des difficultés à partager.

    Or pour comprendre, il va falloir reproduire...
    on entend/lit ce que vous racontez mais vous ne donnez pas grand chose pour aller plus loin.

    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

  5. #5
    Membre prolifique
    Avatar de Sve@r
    Homme Profil pro
    Ingénieur développement logiciels
    Inscrit en
    Février 2006
    Messages
    12 839
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Oise (Picardie)

    Informations professionnelles :
    Activité : Ingénieur développement logiciels
    Secteur : Aéronautique - Marine - Espace - Armement

    Informations forums :
    Inscription : Février 2006
    Messages : 12 839
    Billets dans le blog
    1
    Par défaut
    Bonjour
    Citation Envoyé par bern111 Voir le message
    mon erreur est sur la ligne 66 de mon code

    Code : Sélectionner tout - Visualiser dans une fenêtre à part
    x_snap, y_snap = grid.snap_to_mask(acc > 1200, (x, y))
    'NoneType' object has no attribute 'mask'
    Cette ligne ne montre pas d'instruction type xxx.mask. Pourtant c'est cette instruction qui demande l'utilisation d'un attribut "mask" depuis un objet "xxx". Donc ce n'est pas la ligne qui provoque l'erreur.
    Il est souvent courant que l'erreur provienne de la ligne X qui appelle la ligne Y qui appelle la ligne Z et c'est la ligne Z qui est foireuse mais Python montre l'ensemble des appels. Un débutant peut alors être dérouté et croire que l'erreur vient de la ligne X.
    Ou alors l'erreur est plus exacement 'NoneType' object has no attribute 'snap_to_mask' et tu t'es trompé en la recopiant. Et dans ce cas ça veut dire que "grid" est None (reste à savoir pourquoi).

    Citation Envoyé par bern111 Voir le message
    Pourtant le grid est bien remplit et n'est donc pas NoneType et il contient l'attribut mask.
    A vérifier => print(grid, type(grid)) en ligne 65 (toujours afficher type(truc) quand on veut vérifier la nature réelle de "truc").
    Mon Tutoriel sur la programmation «Python»
    Mon Tutoriel sur la programmation «Shell»
    Sinon il y en a pleins d'autres. N'oubliez pas non plus les différentes faq disponibles sur ce site
    Et on poste ses codes entre balises [code] et [/code]

  6. #6
    Candidat au Club
    Homme Profil pro
    sigiste
    Inscrit en
    Octobre 2024
    Messages
    3
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Seine Saint Denis (Île de France)

    Informations professionnelles :
    Activité : sigiste

    Informations forums :
    Inscription : Octobre 2024
    Messages : 3
    Par défaut
    Merci pour vos réponses..

    Cette ligne ne montre pas d'instruction type xxx.mask oui je suis d'accord

    L'erreur renvoyé est bien mask
    'NoneType' object has no attribute mask'

    J'ai bien entendu mis des prints un peu partout pour comprendre le cheminement et mon grid contient a bien un attribut mask.

    wiztricks >>> j ai bien saisit ce que tu ma demandé, mais je suis encore dérouté par cette erreur...

    Je vais essayé d'être plus précis et de comprendre , surtout que le script se termine bien mais contrairement à ce que j ai dit il n'y a que un process qui s’enclenche...

  7. #7
    Expert éminent
    Homme Profil pro
    Architecte technique retraité
    Inscrit en
    Juin 2008
    Messages
    21 754
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : France, Manche (Basse Normandie)

    Informations professionnelles :
    Activité : Architecte technique retraité
    Secteur : Industrie

    Informations forums :
    Inscription : Juin 2008
    Messages : 21 754
    Par défaut
    Citation Envoyé par bern111 Voir le message
    Cette ligne ne montre pas d'instruction type xxx.mask oui je suis d'accord

    L'erreur renvoyé est bien mask
    'NoneType' object has no attribute mask'
    Un traceback Python est bien plus bavard que ça... et ne se réduit pas à la dernière ligne (sauf environnement d'exécution particulier).

    Citation Envoyé par bern111 Voir le message
    J'ai bien entendu mis des prints un peu partout pour comprendre le cheminement et mon grid contient a bien un attribut mask.
    Ajouter des prints sans avoir une idée de ce qu'on cherche est d'une utilité discutable et comme on va partir du traceback pour se poser des questions...

    Dit autrement, vous pouvez toujours affirmer que votre démarche est bonne mais comme elle n'aboutit à rien et qu'on ne peut pas vérifier qu'elle est "raisonnable", ça va nulle part.


    - W
    Architectures post-modernes.
    Python sur DVP c'est aussi des FAQs, des cours et tutoriels

Discussions similaires

  1. [Débutant] 'méthode', qui n'est pas valide dans le contexte donné
    Par DarwinTheBeagle dans le forum C#
    Réponses: 6
    Dernier message: 25/03/2023, 10h00
  2. [Débutant] Méthode n'est pas valide dans le contexte donné
    Par Blodsvept dans le forum C#
    Réponses: 9
    Dernier message: 23/06/2015, 09h48
  3. [RegEx] modification de caractère dans un context donné
    Par damaskinos dans le forum Langage
    Réponses: 7
    Dernier message: 08/04/2013, 17h33
  4. "nom dupliqué dans ce contexte"
    Par lgomez dans le forum Langage
    Réponses: 4
    Dernier message: 19/10/2005, 09h49
  5. [système] Comment ajouter un item dans le context menu de Windows ?
    Par ddmicrolog dans le forum API, COM et SDKs
    Réponses: 8
    Dernier message: 29/06/2005, 17h03

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo