Bonjour,

Je suis en train d'implémenter un module de lecture/écriture optimisé permettant à la lecture, le traitement et l'écriture ligne par ligne, de se dérouler en parallèle : des threads de lecture remplissent ligne par ligne chacun une FIFO, qui sont lues par un autre thread de traitement qui remplit avec les résultats une FIFO de sortie, à partir de laquelle le fichier de sortie sera écrit par un autre thread d'écriture.

Pour le test ci-dessous, je n'ai fait qu'une lecture, et le traitement se résume à une simple copie. Je me trouve face aux problèmes suivants :
  • Sous Windows et Linux : le programme ne se termine pas 3 fois sur 5 (les chances de réussite augmentent bizarrement avec le nombre d'opérations entre les deux FIFO), et génère de temps à autres (inexcusable quand même) une SIGSEV.
  • Sous Linux, j'ai droit à du SIGABRT à la création du thread, et après le thread à "memory clobbered before allocated block" (soit une autre SIGABRT) aléatoirement dans les quinze premières lignes.


Croyez bien que j'ai HORREUR de poser mon problème et de vous inviter à réfléchir mais j'ai épuisé mes ressources... Je m'en excuse donc d'avance, en espérant une réponse qui me sortirait d'un beau pétrin.

Main.cpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "io.hpp"
 
#include <iostream>
 
int main(int argc, char *argv[]) {
    CSV::Reader reader;
    CSV::Writer writer;
 
    if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
        CSV::Row row;
 
        reader.run(); //Reads the CSV file and fills the read queue
        writer.run(); //Reads the to-be-written queue and writes it to a txt file
 
        //The loop is supposed to end only if the reader is finished and empty
        while(!(reader.is_finished() && reader.empty())) {
            //Transfers line by line from the read to the to-be-written queues
            reader.wait_and_pop(row);
            writer.push(row);
        }
        //The reader will likely finish before the writer, so he has to finish his queue before continuing.
        writer.finish(); 
    }
    else {
        std::cout << "File error";
    }
 
    return EXIT_SUCCESS;
}
Io.hpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED
 
#include "threads.hpp"
 
#include <fstream>
 
namespace CSV {
    class Row {
        std::vector<std::string> m_data;
 
        friend class Iterator;
        friend void write_row(Row const &row, std::ostream &stream);
 
        void read_next(std::istream& csv);
 
        public:
            inline std::string const& operator[](std::size_t index) const {
                return m_data[index];
            }
            inline std::size_t size() const {
                return m_data.size();
            }
    };
 
    /** Reading *************************************************************************/
 
    class Iterator {
        public:
            Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
                ++(*this);
            }
            Iterator() : m_csv(NULL) {}
 
            //Pre-Increment
            Iterator& operator++() {
                if (m_csv != NULL) {
                    m_row.read_next(*m_csv);
                    m_csv = m_csv->good() ? m_csv : NULL;
                }
 
                return *this;
            }
            inline Row const& operator*() const {
                return m_row;
            }
 
            inline bool operator==(Iterator const& rhs) {
                return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
            }
            inline bool operator!=(Iterator const& rhs) {
                return !((*this) == rhs);
            }
        private:
            std::istream* m_csv;
            Row m_row;
    };
 
    class Reader : public Concurrent_queue<Row>, public Thread {
        std::ifstream m_csv;
 
        Thread_safe_value<bool> m_finished;
 
        void work() {
            if(!!m_csv) {
                for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
                    push(*it);
                }
                m_finished.set(true);
            }
        }
 
    public:
        Reader() {
            m_finished.set(false);
        }
 
        inline bool open(std::string path) {
            m_csv.open(path.c_str());
 
            return !!m_csv;
        }
 
        inline bool is_finished() {
            return m_finished.get();
        }
    };
 
    /** Writing ***************************************************************************/
 
    void write_row(Row const &row, std::ostream &stream);
 
    //Is m_finishing really thread-safe ? By the way, is it mandatory ?
    class Writer : public Concurrent_queue<Row>, public Thread {
        std::ofstream m_csv;
 
        Thread_safe_value<bool> m_finishing;
 
        void work() {
            if(!!m_csv) {
                CSV::Row row;
 
                while(!(m_finishing.get() && empty())) {
                    wait_and_pop(row);
                    write_row(row, m_csv);
                }
            }
        }
 
    public:
        Writer() {
            m_finishing.set(false);
        }
 
        inline void finish() {
            m_finishing.set(true);
            catch_up();
        }
 
        inline bool open(std::string path) {
            m_csv.open(path.c_str());
 
            return !!m_csv;
        }
    };
}
 
#endif
Io.cpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "io.hpp"
 
#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>
 
void CSV::Row::read_next(std::istream& csv) {
    std::string row;
    std::getline(csv, row);
 
    boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
    m_data.assign(tokenizer.begin(), tokenizer.end());
}
 
void CSV::write_row(Row const &row, std::ostream &stream) {
    std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
    stream << std::endl;
}
Threads.hpp :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED
 
#include <boost/bind.hpp>
#include <boost/thread.hpp>
 
class Thread {
protected:
    boost::thread *m_thread;
 
    virtual void work() = 0;
 
    void do_work() {
        work();
    }
 
public:
    Thread() : m_thread(NULL) {}
    virtual ~Thread() {
        catch_up();
        if(m_thread != NULL) {
            delete m_thread;
        }
    }
 
    inline void catch_up() {
        if(m_thread != NULL) {
            m_thread->join();
        }
    }
 
    void run() {
        m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
    }
};
 
/** Thread-safe datas **********************************************************/
 
#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
 
template <class T>
class Thread_safe_value : public boost::noncopyable {
    T m_value;
    boost::mutex m_mutex;
 
    public:
        T const &get() {
            boost::mutex::scoped_lock lock(m_mutex);
            return m_value;
        }
        void set(T const &value) {
            boost::mutex::scoped_lock lock(m_mutex);
            m_value = value;
        }
};
 
template<typename Data>
class Concurrent_queue {
    std::queue<Data> m_queue;
    mutable boost::mutex m_mutex;
    boost::condition_variable m_cond;
 
public:
    void push(Data const& data) {
        boost::mutex::scoped_lock lock(m_mutex);
        m_queue.push(data);
        lock.unlock();
        m_cond.notify_one();
    }
 
    bool empty() const {
        boost::mutex::scoped_lock lock(m_mutex);
        return m_queue.empty();
    }
 
    void wait_and_pop(Data& popped) {
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.empty()) {
            m_cond.wait(lock);
        }
 
        popped = m_queue.front();
        m_queue.pop();
    }
};
 
#endif // THREAD_HPP_INCLUDED
D'avance, merci.

Cordialement,

Kidpaddle2.

P.S: Concurrent_queue est le seul élément qui n'est pas de moi : http://www.justsoftwaresolutions.co....variables.html. J'ai confiance en son efficacité.