[STL] Problème d’accès concurrent à un conteneur
Bonjour,
Pour un projet personnel, j'ai été amené à créer un classe gérant la répartition de callback sur plusieurs threads (nommé Task ou Callback selon le context d'éxécution voulu, n'importe quel thread ou principal respectivement).
Quand je configure mon programme pour qu'il utilise 2 threads (principal inclut), il plante (le lieu du plantage sera indiqué dans le code). Quand je ne met que un seul thread, tout fonctionne.
Visual Studio m'indique de plus avec une assertion que le problème est dû à un déférencement impossible d'un itérateur. J'ai essayé de changer le conteneur (queue au début, puis list, puis deque), cela ne change rien.
Le code :
TaskManager.hpp
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
#pragma once
#include <functional>
#include <vector>
#include <mutex>
#include <thread>
#include <deque>
#include "Semaphore.hpp"
class TaskManager
{
public:
TaskManager ();
~TaskManager ();
void pollCallback (unsigned int taskTimeLimit);
void setThreadNumber (unsigned int number);
inline void addCallback (const std::function <void (void)>& callback)
{
m_mutex.lock ();
m_callbackQueue.push_back (callback);
m_mutex.unlock ();
}
inline void addTask (const std::function <void (void)>& task)
{
m_mutex.lock ();
m_taskQueue.push_back (task);
m_semaphore.post ();
m_mutex.unlock ();
}
private:
std::deque <std::function <void (void)>> m_callbackQueue;
std::deque <std::function <void (void)> > m_taskQueue;
std::mutex m_mutex;
struct ThreadData
{
std::thread* thread;
bool quit;
};
std::vector <ThreadData> m_threadPool;
Semaphore m_semaphore;
}; |
TaskManager.cpp
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
#include "TaskManager.hpp"
#include <exception>
#include <ctime>
TaskManager::TaskManager ()
{
}
TaskManager::~TaskManager ()
{
setThreadNumber (1);
}
void TaskManager::pollCallback (unsigned int taskTimeLimit)
{
clock_t c = clock ();
m_mutex.lock ();
while (true)
{
while (!m_callbackQueue.empty ()) /* ### <<< Celui VS, l'erreur est ici quelque soit la méthode de débugage (pas à pas, arrêt lors de l'assertion ...) */
{
std::function <void (void)> f = m_callbackQueue.front ();
m_callbackQueue.pop_front ();
m_mutex.unlock ();
f ();
m_mutex.lock ();
}
if (m_taskQueue.empty () || (unsigned int) (clock () - c) / (CLOCKS_PER_SEC / 1000) > taskTimeLimit)
break;
if (m_semaphore.trywait ())
{
std::function <void (void)> f = m_taskQueue.front ();
m_taskQueue.pop_front ();
m_mutex.unlock ();
f ();
m_mutex.lock ();
}
}
m_mutex.unlock ();
}
void TaskManager::setThreadNumber (unsigned int number)
{
#ifdef _DEBUG
if (number == 0)
{
throw std::invalid_argument ("Can not set the program to have 0 threads");
}
#endif
number --;
if (number == m_threadPool.size ())
return;
else if (number > m_threadPool.size ())
{
const unsigned int last = m_threadPool.size ();
m_threadPool.resize (number);
for (unsigned int i = last ; i < number ; i++)
{
ThreadData& current = m_threadPool [i];
current.quit = false;
current.thread = new std::thread ([this, ¤t] ()
{
while (!current.quit)
{
m_mutex.lock ();
while (m_semaphore.trywait ())
{
std::function <void (void)> f = m_taskQueue.front ();
m_taskQueue.pop_front ();
m_mutex.unlock ();
f ();
m_mutex.lock ();
}
m_mutex.unlock ();
std::unique_lock <std::mutex> lock (m_mutex);
m_semaphore.wait (lock);
}
});
}
}
else
{
for (unsigned int i = number ; i < m_threadPool.size () ; i++)
{
ThreadData& current = m_threadPool [i];
current.quit = true;
delete current.thread;
}
m_threadPool.resize (number);
}
} |
Semaphore.hpp
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
#pragma once
#include <condition_variable>
#include <mutex>
class Semaphore
{
public:
Semaphore ()
{
}
~Semaphore ()
{
m_cond.notify_all ();
}
inline void post ()
{
m_counter++;
m_cond.notify_one ();
}
inline void wait (std::unique_lock <std::mutex>& lock)
{
while (!m_counter)
m_cond.wait (lock);
m_counter --;
return;
}
inline bool trywait ()
{
bool ok = m_counter != 0;
if (ok)
m_counter --;
return ok;
}
private:
unsigned int m_counter;
std::condition_variable m_cond;
}; |
Merci d'avance,