1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| template <typename T, class Compare>
class ThreadSafePriorityQueue
{
private:
typedef std::priority_queue<T, std::deque<T>, Compare> priority_queue;
priority_queue m_queue;
size_t m_max_size;
size_t m_min_pop_size;
std::mutex m_mutex;
std::condition_variable m_cvPush;
std::condition_variable m_cvPop;
public:
ThreadSafePriorityQueue(const Compare &cmp, const size_t max_size, const size_t min_pop_size=1):
m_queue(cmp), m_max_size(max_size), m_min_pop_size(min_pop_size)
{
}
~ThreadSafePriorityQueue()
{
}
void push(const T &v) noexcept
{
std::unique_lock<std::mutex> lock( m_mutex );
// Wait until there is space in the queue.
while( m_queue.size() == m_max_size )
{
m_cvPush.wait( lock );
}
// Push to queue.
m_queue.push(v);
// Wake up one popping thread.
if( m_queue.size() >= m_min_pop_size )
{
m_cvPop.notify_one();
}
}
void push(T &&v) noexcept
{
std::unique_lock<std::mutex> lock( m_mutex );
// Wait until there is space in the queue.
while( m_queue.size() == m_max_size )
{
m_cvPush.wait( lock );
}
// Push to queue.
m_queue.push(v);
// Wake up one popping thread.
if( m_queue.size() >= m_min_pop_size )
{
m_cvPop.notify_one();
}
}
T pop() noexcept
{
std::unique_lock<std::mutex> lock( m_mutex );
// If there is no item then we wait until there is one.
while( m_queue.empty() )
{
m_cvPop.wait( lock );
}
// If we reach here then there is an item, get it.
T ret = m_queue.top();
m_queue.pop();
// Wake up one pushing thread.
m_cvPush.notify_one();
return ret;
}
size_t size() noexcept
{
std::lock_guard<std::mutex> lock( m_mutex );
return m_queue.size();
}
bool empty() noexcept
{
std::lock_guard<std::mutex> lock( m_mutex );
return m_queue.empty();
}
void set_max_size(const size_t max_size) noexcept
{
std::lock_guard<std::mutex> lock( m_mutex );
m_max_size = max_size;
}
void set_min_pop_size(const size_t min_pop_size) noexcept
{
std::lock_guard<std::mutex> lock( m_mutex );
m_min_pop_size = min_pop_size;
}
void flush(const std::function<void(T&)> &func = [](){}) noexcept
{
std::lock_guard<std::mutex> lock( m_mutex );
while( !m_queue.empty() )
{
T ret = m_queue.top();
func( ret );
m_queue.pop();
}
// Wake up one pushing thread.
m_cvPush.notify_one();
}
}; |
Partager