Bonjour tout le monde,

Je fais un serveur websocket à l'aide de Tornado (6.x) pour que plusieurs clients puissent lancer des calculs et recevoir en "temps réel" les résultats.

Certains de ces calculs sont longs et bloquant. Pour palier à ce problème j'utilise des threads mais je ne suis pas certains que ça soit la meilleurs méthodes pour gérer la concurrence.

Voici un example de mon petit serveur:

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from tornado import ioloop, websocket, web, concurrent, gen
from tornado import ioloop, websocket, web, concurrent, gen
import os
import datetime
import time
import logging
import signal
import sys
import asyncio
import json
import threading
import numpy as np
 
if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
 
cl = []
 
class SocketHandler(websocket.WebSocketHandler):
 
    def __init__(self, application, request, **kwargs):
        super().__init__(application, request, **kwargs)
        self.executor = concurrent.futures.ThreadPoolExecutor(8)
        self.io_loop = ioloop.IOLoop.current()
 
    def check_origin(self, origin):
        return True
 
    def open(self):
        if self not in cl:
            print("Create new connexion: {0}".format(self))
            cl.append(self)
 
    def on_close(self):
        if self in cl:
            print("Close connexion: {0}".format(self))
            cl.remove(self)
 
    def on_message(self, message):
        if self in cl:
            index = cl.index(self)
            client_buffer = cl[index]
            self.write_message("recieved message start infinite loop")
            self.executor.submit(execute, self.io_loop, self.send_message)
 
 
    def send_message(self, msg):
        try:
            if self in cl:
                self.write_message(message=msg)
                return True
        except ValueError as e:
            print(e)
        return False
 
# Fonction bloquante (pas du tout optimisé exprès pour l'example)
def execute(loop, msg_callback):
    print("execute")
    i = 0
    arr1 = np.rand(512, 512, 200)
    arr2 = np.rand(512, 512, 200)
    num = 0
    for k in range(512):
        for j in range(512):
            for i in range(200):
                num += arr1[k][j][i] * arr2[k][j][i]
    loop.add_callback(msg_callback, "num: {0}".format(num))
 
class MyApplication(web.Application):
    is_closing = False
 
    def signal_handler(self, signum, frame):
        logging.info('exiting...')
        self.is_closing = True
 
    def try_exit(self):
        if self.is_closing:
            # clean up here
            ioloop.IOLoop.instance().stop()
            logging.info('exit success')
 
 
app = MyApplication([
    (r'/', SocketHandler)
])
 
if __name__ == "__main__":
    signal.signal(signal.SIGINT, app.signal_handler)
    app.listen(1234)
    print("Server open on port: {0}".format(1234))
    ioloop.PeriodicCallback(app.try_exit, 10).start()
    ioloop.IOLoop.instance().start()
J'ai essayé avec des fonctions asynchrones mais je n'arrive même pas à me connecter avec plusieurs clients si un autre client a déjà lancé la fonction bloquante.

Le problème de cette méthode c'est que je ne peux pas interrompre le thread (ou je ne sais pas comment faire) et donc utilise des ressources inutilement.

Je pensais à la base que Tornado géré automatiquement les connexions multiples avec un process par client, mais apparemment non.

Quelqu'un a-t-il des conseils pour régler ce genre de problème? Peut être un autre serveur Websocket?

Merci