IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Navigation

Inscrivez-vous gratuitement
pour pouvoir participer, suivre les réponses en temps réel, voter pour les messages, poser vos propres questions et recevoir la newsletter

C# Discussion :

Comment faire une queue de threads ?


Sujet :

C#

Vue hybride

Message précédent Message précédent   Message suivant Message suivant
  1. #1
    Expert confirmé
    Avatar de StringBuilder
    Homme Profil pro
    Chef de projets
    Inscrit en
    Février 2010
    Messages
    4 197
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 46
    Localisation : France, Rhône (Rhône Alpes)

    Informations professionnelles :
    Activité : Chef de projets
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Février 2010
    Messages : 4 197
    Billets dans le blog
    1
    Par défaut Comment faire une queue de threads ?
    Bonjour,

    Je dois compresser une liste de quelques centaines de fichiers au format GZip.

    Le serveur sur lequel ça tourne dispose de 16 coeurs.

    J'ai fait un programme mono-threadé, donc ça bouffe au max 6% du CPU, ce qui est loin d'être optimum.

    Je souhaite donc modifier mon programme de façon à traiter en // plusieurs fichiers.

    Je saurai lancer la compression de TOUS les fichiers en même temps à l'intérieur d'une boucle.
    Sauf que j'ai pas franchement envie de planter le serveur : je doute fortement que ce soit une bonne idée de lancée plusieurs centaines de threads en même temps.

    Ce que je souhaite faire, c'est une file d'attente contenant tous les fichiers, et n'en traiter que X en // (avec X = nb core - 1 par exemple).

    Seulement, je ne m'en sors pas.

    Voici (une partie) de mon code :
    Code csharp : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
     
                // Main
                List<FileInfo> FilesToCompress = null;
                FilesToCompress = GetAllFiles();
                var t = Task.Run(() => AsyncCompressAllFiles(FilesToCompress));
                t.Wait();
     
     
            public static void CompressFile(FileInfo fileToCompress)
            {
                using (FileStream originalFileStream = fileToCompress.OpenRead())
                {
                    Console.WriteLine("Compressing {0}.", fileToCompress.FullName);
                    using (FileStream compressedFileStream = File.Create(fileToCompress.FullName + ".gz"))
                    {
                        using (GZipStream compressionStream = new GZipStream(compressedFileStream, CompressionLevel.Optimal))
                        {
                            originalFileStream.CopyTo(compressionStream);
                        }
                    }
                    FileInfo info = new FileInfo(fileToCompress.FullName + ".gz");
                    Console.WriteLine("Compressed {0} from {1} to {2} bytes.", fileToCompress.FullName, fileToCompress.Length.ToString(), info.Length.ToString());
                }
                fileToCompress.Delete();
            }
     
            public static async Task AsyncCompressFile(FileInfo fileToCompress)
            {
                await Task.Run(() => CompressFile(fileToCompress));
            }
     
            public async static void CompressAllFiles(List<FileInfo> filesToCompress)
            {
                TaskQueue queue = new TaskQueue();
                foreach (FileInfo fileToCompress in filesToCompress)
                {
                    await queue.Enqueue(()=>AsyncCompressFile(fileToCompress));
                }
            }
     
         public class TaskQueue
        {
            private SemaphoreSlim semaphore;
            public TaskQueue()
            {
                if (Environment.ProcessorCount <= 1)
                {
                    semaphore = new SemaphoreSlim(1);
                }
                else if (Environment.ProcessorCount > 32)
                {
                    semaphore = new SemaphoreSlim(31);
                }
                else
                {
                    semaphore = new SemaphoreSlim(Environment.ProcessorCount - 1);
                }
     
            }
     
            public async Task Enqueue(Func<Task> taskGenerator)
            {
                await semaphore.WaitAsync();
                try
                {
                    await taskGenerator();
                }
                finally
                {
                    semaphore.Release();
                }
            }
        }

    J'ai trouvé la classe TaskQueue sur un forum, et elle semble, de ce que j'en comprends, faire ce que je veux.

    Au détail près que lors de son appel, un seul thread est lancé à la fois.
    Sur ma machine de dev qui a 4 processeurs logiques, je m'attends à en vois 3 tourner en même temps...

  2. #2
    Membre chevronné
    Avatar de nouanda
    Homme Profil pro
    Hobbyist
    Inscrit en
    Mai 2002
    Messages
    246
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Localisation : Australie

    Informations professionnelles :
    Activité : Hobbyist

    Informations forums :
    Inscription : Mai 2002
    Messages : 246
    Par défaut
    Je pense que ce que tu cherches à faire tombe parfaitement dans l'utilisation d'un ThreadPool.
    Un tutoriel de François Dorin sur developpez.com.
    Et pour aller plus loin, la documentation officielle:

  3. #3
    Expert confirmé

    Avatar de François DORIN
    Homme Profil pro
    Consultant informatique
    Inscrit en
    Juillet 2016
    Messages
    2 761
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 41
    Localisation : France, Charente Maritime (Poitou Charente)

    Informations professionnelles :
    Activité : Consultant informatique
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Juillet 2016
    Messages : 2 761
    Billets dans le blog
    21
    Par défaut
    Bonjour,

    Petit complément de réponse. Effectivement, le pool de threads répondra parfaitement à ton besoin. Il est également possible de modifier le nombre de threads maximum, via la méthode ThreadPool.SetMaxThreads

  4. #4
    Expert confirmé
    Avatar de StringBuilder
    Homme Profil pro
    Chef de projets
    Inscrit en
    Février 2010
    Messages
    4 197
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 46
    Localisation : France, Rhône (Rhône Alpes)

    Informations professionnelles :
    Activité : Chef de projets
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Février 2010
    Messages : 4 197
    Billets dans le blog
    1
    Par défaut
    Merci pour vos réponses.

    Je suis un peu surpris par cette remarque dans la MSDN (et j'avoue, j'ai rien compris aux deux paramètres de SetMaxThreads) :

    You cannot set the maximum number of worker threads or I/O completion threads to a number smaller than the number of processors on the computer. To determine how many processors are present, retrieve the value of the Environment.ProcessorCount property. In addition, you cannot set the maximum number of worker threads or I/O completion threads to a number smaller than the corresponding minimum number of worker threads or I/O completion threads. To determine the minimum thread pool size, call the GetMinThreads method.
    Du coup, je suis obligé de laisser mon programme s'étaller comme un gros porc sur chaque processeur ?

    Moi, au contraire, je souhaiterais avoir la main pour obliger mon programme à certes, bosser dans plusieurs threads, mais laisser des ressources pour :
    1/ Le thread principal
    2/ Les autres applications qui tournent sur le serveur

    Du coup j'avoue que je suis relativement perplexe...

    Et sinon, y'a pas un "Wait()" sur le threadpool ?

    Une fois que j'ai GZippé les fichiers, j'ai d'autres traitements à faire (mais pas à commencer tant que tous les fichiers sont pas compressés).

    Je fais comment pour attendre que tous les Thread de mon ThreadPool soient terminés dans mon Main ?

  5. #5
    Expert confirmé

    Avatar de François DORIN
    Homme Profil pro
    Consultant informatique
    Inscrit en
    Juillet 2016
    Messages
    2 761
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 41
    Localisation : France, Charente Maritime (Poitou Charente)

    Informations professionnelles :
    Activité : Consultant informatique
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Juillet 2016
    Messages : 2 761
    Billets dans le blog
    21
    Par défaut
    Citation Envoyé par StringBuilder Voir le message
    Merci pour vos réponses.

    Je suis un peu surpris par cette remarque dans la MSDN (et j'avoue, j'ai rien compris aux deux paramètres de SetMaxThreads) :



    Du coup, je suis obligé de laisser mon programme s'étaller comme un gros porc sur chaque processeur ?
    Effectivement, je n'avais jamais fais attention à cette limitation. Il faudra que je creuse à l'occasion sur le pourquoi du comment.

    Citation Envoyé par StringBuilder Voir le message
    Moi, au contraire, je souhaiterais avoir la main pour obliger mon programme à certes, bosser dans plusieurs threads, mais laisser des ressources pour :
    1/ Le thread principal
    2/ Les autres applications qui tournent sur le serveur
    A te lire, j'ai l'impression que tu souhaites donner une priorité d'exécution inférieur à tes tâches. Dans ce cas, tu es obligé de créer manuellement des threads, car il n'est pas possible de modifier la priorité des threads du pool de thread.

    Citation Envoyé par StringBuilder Voir le message
    Et sinon, y'a pas un "Wait()" sur le threadpool ?
    Effectivement, il n'y a pas de Wait sur le pool de thread, car cela n'a pas de sens. Le thread principal (celui qui exécute un programme à son lancement) fait parti du pool de threads ! Et le pool de thread peut être utilisé dans de nombreuses circonstances, sans même que tu t'en doutes.

    Pour attendre les tâches en question, tu es obligé de récupérer l'instance de chaque Task créée pour faire un Task.WaitAll(task1, task2, ...). Si tu créés manuellement les Thread, alors il faut appeler Thread.Join pour attendre la fin d'un thread. Mais l'API fournie est moins pratique que celle avec les Tasks car il n'existe pas de JoinAll (normal, car Task est une abstraction supplémentaire basée sur une utilisation sous-jacente de Thread). Par contre, tu peux créer une Task qui attend un Thread, et réutiliser ainsi l'API des Tasks pour gérer les attentes.

    Citation Envoyé par StringBuilder Voir le message
    Une fois que j'ai GZippé les fichiers, j'ai d'autres traitements à faire (mais pas à commencer tant que tous les fichiers sont pas compressés).
    Ici, il suffit simplement d'utiliser Task.WhenAll au lieu de Task.WaitAll
    Citation Envoyé par StringBuilder Voir le message
    Je fais comment pour attendre que tous les Thread de mon ThreadPool soient terminés dans mon Main ?
    Impossible, comme le thread principal est lui-même sur le pool de thread.

  6. #6
    Expert confirmé
    Avatar de StringBuilder
    Homme Profil pro
    Chef de projets
    Inscrit en
    Février 2010
    Messages
    4 197
    Détails du profil
    Informations personnelles :
    Sexe : Homme
    Âge : 46
    Localisation : France, Rhône (Rhône Alpes)

    Informations professionnelles :
    Activité : Chef de projets
    Secteur : High Tech - Éditeur de logiciels

    Informations forums :
    Inscription : Février 2010
    Messages : 4 197
    Billets dans le blog
    1
    Par défaut
    Bon,

    Je pense que je suis loin d'avoir fait un truc génial, mais ça fait ce que je cherche à faire :
    Code csharp : Sélectionner tout - Visualiser dans une fenêtre à part
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
     
    using System;
    using System.IO;
    using System.IO.Compression;
    using System.Threading;
    using System.Collections.Generic;
     
    namespace TestThreads
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Attempt with threads");
     
                MyThreadPool pool = new MyThreadPool();
     
                string[] files = Directory.GetFiles(@"c:\in", "*.*", SearchOption.AllDirectories);
                for (int i = 0, cpt = files.Length; i < cpt; ++i)
                {
                    pool.Enqueue(files[i]);
                }
                pool.DoWork();
     
                Console.WriteLine("Main finished");
                Console.ReadKey(true);
            }
        }
     
        static class GZip
        {
            public static void CompressFile(string param)
            {
                FileInfo fileToCompress = new FileInfo(param);
                using (FileStream originalFileStream = fileToCompress.OpenRead())
                {
                    Console.WriteLine("Compressing {0}.", fileToCompress.FullName);
                    using (FileStream compressedFileStream = File.Create(fileToCompress.FullName + ".gz"))
                    {
                        using (GZipStream compressionStream = new GZipStream(compressedFileStream, CompressionLevel.Optimal))
                        {
                            originalFileStream.CopyTo(compressionStream);
                        }
                    }
                    FileInfo info = new FileInfo(fileToCompress.FullName + ".gz");
                    Console.WriteLine("Compressed {0} from {1} to {2} bytes.", fileToCompress.FullName, fileToCompress.Length.ToString(), info.Length.ToString());
                }
                fileToCompress.Delete();
            }
        }
     
        class MyThreadPool : Queue<String>
        {
            int MaxWorkers;
     
            List<Thread> Workers = null;
     
            public MyThreadPool() : this(Environment.ProcessorCount)
            {
            }
     
            public MyThreadPool(int maxWorkers)
            {
                if (maxWorkers < 1) maxWorkers = 1;
                MaxWorkers = maxWorkers;
                Workers = new List<Thread>(MaxWorkers);
     
                for (int i = 0; i < MaxWorkers; ++i)
                {
                    Workers.Add(null);
                }
            }
     
            public void DoWork()
            {
                while (Count > 0)
                {
                    for (int i = 0; i < MaxWorkers; ++i)
                    {
                        if (Workers[i] == null || Workers[i].ThreadState == ThreadState.Stopped || Workers[i].ThreadState == ThreadState.Aborted)
                        {
                            if (Count > 0)
                            {
                                Workers[i] = new Thread(DoJob);
                                Workers[i].Start(Dequeue());
                            }
                            else
                            {
                                break;
                            }
                        }
                    }
     
                    Console.WriteLine("Waits for a free worker");
                    Thread.Sleep(100);
                }
     
                for (int i = 0; i < MaxWorkers; ++i)
                {
                    if (Workers[i] != null && Workers[i].ThreadState == ThreadState.Running || Workers[i].ThreadState == ThreadState.WaitSleepJoin)
                    {
                        Console.WriteLine("Wait for the worker {0} to ends", i);
                        Workers[i].Join();
                    }
                    else if (Workers[i] != null && Workers[i].ThreadState != ThreadState.Stopped)
                    {
                        Console.WriteLine("Worker {0} is in a strange state : {1}", i, Workers[i].ThreadState);
                    }
                }
            }
     
            private void DoJob(object param)
            {
                GZip.CompressFile(param as string);
            }
        }
    }

    Toute suggestion est la bien venue...

    Notamment, j'aimerais bien rendre ma classe "MyThreadPool" générique, avec possibilité de passer en paramètre la méthode à lancer pour chaque thread...

Discussions similaires

  1. Comment faire une interface de ce type....
    Par SpiderAlpha dans le forum C++Builder
    Réponses: 6
    Dernier message: 30/04/2007, 13h50
  2. [NetBeans] Comment faire une applet ???
    Par goldbar dans le forum NetBeans
    Réponses: 3
    Dernier message: 30/05/2004, 13h52
  3. Réponses: 2
    Dernier message: 03/05/2004, 12h13
  4. [VB6] Comment faire une fonction qui renvoie 2 résultats
    Par tazarine dans le forum VB 6 et antérieur
    Réponses: 10
    Dernier message: 15/01/2004, 00h13
  5. Réponses: 10
    Dernier message: 10/10/2003, 14h25

Partager

Partager
  • Envoyer la discussion sur Viadeo
  • Envoyer la discussion sur Twitter
  • Envoyer la discussion sur Google
  • Envoyer la discussion sur Facebook
  • Envoyer la discussion sur Digg
  • Envoyer la discussion sur Delicious
  • Envoyer la discussion sur MySpace
  • Envoyer la discussion sur Yahoo