Bonjour,

Je suis en train de replacer dans une application Python3.4.3 & PyQt5 un QThread par des opérations asynchrones a l'aide a la librairie asyncio.
Pour le moment ce que j'ai fait fonctionne, mais par contre la GUI reste bloquée pendant les traitements.
J'avoue aussi ne pas avoir complétement compris le "asyncio.sleep", en gros ce que j'en comprend c'est que c'est sensé relâcher l'event loop, durant un temps donné mais c'est tout.

Voici mon code :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
 
class TestsController:
 
    def got_result(self, callbac):
        print("ok")
 
    @log_method('log_tests')
    def run_worker(self, list_test):
        self.worker = TestsWorker(self.get_env(), list_test)
 
        self.loop = asyncio.get_event_loop()
        task = self.loop.create_task(self.worker.run_tests())
        task.add_done_callback(self.got_result)
        self.loop.run_until_complete(task)
        # self.loop.close()
 
 
class TestWorkerStatus(Enum):
    """Enumeration describing the current test thread status."""
 
    running = 0
    paused = 1
    stoped = 2
 
 
class TestsWorker(object):
 
    def __init__(self, env, test_list):
        self.env = env
        self.test_list = test_list
 
    @asyncio.coroutine
    def run_tests(self):
        logging.getLogger('log_tests').info('Running tests ... ')
        current_test = None
        application_state.status = _("Running tests ...")
        self.tests_worker_status = TestWorkerStatus.running
 
        for item in self.test_list:
            self.current_test_successful = False
            while not self.current_test_successful:
                # ... Handle other status
                if self.tests_worker_status == TestWorkerStatus.running:
                    if isinstance(item, File):
                        print("will run next test")
                        results = yield from self._run_test(item)
                        print("results : ", results)
                        # .... Handle result 
                        self.current_test_successful = True
        application_state.status = _("Tests finished ...")
        signal_manager.tests_finished.emit()
 
    @asyncio.coroutine
    def _run_test(self, item):
        current_test = None
        try:
            if isinstance(item, File):
                module = SourceFileLoader(item.name, item.path).load_module()
                test = module.Test(self.env)
                current_test = test
                self.update_test_progression(self.test_list, item, test._name)
                results = test.test()   #  < --- This is the long process 
                return results
        except TestException as test_ex:
                ... Exception handling
        except Exception as ex:
            current_test.add_result(False, "Exception : {0}".format(ex))
            self._handle_exception(ex, ex, current_test._result)
            return
 
    @log_method('log_tests')
    def update_test_progression(self, list_test, test, test_name):
        try:
            progression = str(list_test.index(test) + 1) + "/" + str(len(list_test))
            text = _("Test : {0},  Name : {1} ").format(progression, test_name)
            application_state.status = text
        except:
            raise
 
    @log_method('log_tests')
    def _handle_exception(self, exception, exception_text, current_test_result):
        try:
            signal_manager.result_test_received.emit(current_test_result)
            logging.getLogger('log_tests').error(exception)
            signal_manager.error_occured.emit(str(exception_text))
            application_state.status = _("An error occured during last test. : {0}").format(exception)
            signal_manager.test_exception_occured.emit()
            signal_manager.tests_finished.emit()
        except:
            raise
Voila comment je lance mes traitements. Ceux-ci sont exécuté. Le processus long se trouve dans la classe "Test" que je n'ai pas encore touché.
La classe test hérite de "TestCase", celle-ci contient quelques méthodes utiles aux tests, mais n'utilisent pas asyncio.
De plus, la classe "Test" peu faire appel à d'autres API qui ne sont pas asynchrone.

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 
from components.test_api.test_case import TestCase
from components.test_api.tests_fixtures import Fixtures
 
class Test(TestCase):
    """Generic test class."""
 
    def __init__(self, environement):
        """Initializer."""
        super(Test, self).__init__(environement)
        self._category = _("01 Software control")
        self._name = _("Cleaning EEPROM")
        self._index = 1
        self.results_names = [_("01 - Reset EEPROM")]
        self.available_in = ["production"]
 
    @Fixtures.inject_cpu_api
    def test(self):
        """Generic test method."""
        rez = self.cmd.clear_eeprom()
        value = rez == ['ANS_GENERAL', 'OK']
        return self.add_result(self.results_names[0], value, _("Reset EEPROM"))
Comment faire en sorte que ce code ne soit plus bloquant ?