1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| import select
import types
import collections
# Object that represents a running task
class Task(object):
def __init__(self,target):
self.target = target # A coroutine
self.sendval = None # Value to send when resuming
self.stack = [] # Call stack
def run(self):
try:
result = self.target.send(self.sendval)
if isinstance(result,SystemCall):
return result
if isinstance(result,types.GeneratorType):
self.stack.append(self.target)
self.sendval = None
self.target = result
else:
if not self.task: return
self.sendval = result
self.target = self.stack.pop()
except StopIteration:
if not self.stack: raise
self.sendval = None
self.target = self.stack.pop()
# Object that represents a "system call"
class SystemCall(object):
def handle(self,sched,task): pass
# Implementation of different system calls
class ReadWait(SystemCall):
def __init__(self,f):
self.f = f
def handle(self,sched,task):
fileno = self.f.fileno()
sched.readwait(task,fileno)
class WriteWait(SystemCall):
def __init__(self,f):
self.f = f
def handle(self,sched,task):
fileno = self.f.fileno()
sched.writewait(task,fileno)
class NewTask(SystemCall):
def __init__(self,target):
self.target = target
def handle(self,sched,task):
sched.new(self.target)
sched.schedule(task)
# Scheduler object
class Scheduler(object):
def __init__(self):
self.task_queue = collections.deque()
self.read_waiting = {}
self.write_waiting = {}
self.numtasks = 0
# Create a new task out of a coroutine
def new(self,target):
newtask = Task(target)
self.schedule(newtask)
self.numtasks += 1
# Put a task on the task queue
def schedule(self,task):
self.task_queue.append(task)
# Have a task wait for data on a file descriptor
def readwait(self,task,fd):
self.read_waiting[fd] = task
# Have a task wait for writing on a file descriptor
def writewait(self,task,fd):
self.write_waiting[fd] = task
# Main scheduler loop
def mainloop(self,count=-1,timeout=None):
while self.numtasks:
# Check for I/O events to handle
if self.read_waiting or self.write_waiting:
wait = 0 if self.task_queue else timeout
r,w,e = select.select(self.read_waiting,self.write_waiting,[],wait)
for fileno in r:
self.schedule(self.read_waiting.pop(fileno))
for fileno in w:
self.schedule(self.write_waiting.pop(fileno))
# Run all the tasks on the queue that are ready to run
while self.task_queue:
task = self.task_queue.popleft()
try:
result = task.run()
if isinstance(result,SystemCall):
result.handle(self,task)
else:
self.schedule(task)
except StopIteration:
self.numtasks -= 1
# If no tasks can run, we decide if we wait or return
else:
if count > 0: count -= 1
if count == 0: return
if __name__ == '__main__':
from socket import socket, AF_INET, SOCK_STREAM
def time_server(address):
import time
s = socket(AF_INET,SOCK_STREAM)
s.bind(address)
s.listen(5)
while True:
yield ReadWait(s)
conn, addr = s.accept()
print("Connection from %s " % str(addr))
yield WriteWait(conn)
resp = time.ctime() + "\r\n"
conn.send(resp.encode('latin-1'))
conn.close()
sched = Scheduler()
sched.new(time_server(('',10000))) # Server on port 10000
sched.new(time_server(('',11000))) # Server on port 11000
sched.mainloop() |
Partager