1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger('test')
import time
from threading import Thread, Event, Lock
# WorkQ defines an interface.
class WorkQ(list):
def pop(self):
if self:
return super().pop(0)
def push(self, target, args=(), kwargs={}):
super().append( (target, args, kwargs) )
@property
def size(self):
return len(self)
IDLE, RUN, PAUSED, SHUT = 'idle', 'run', 'paused', 'shut'
CPU_STATES = IDLE, RUN, PAUSED, SHUT
class Processor(Thread):
number = 0
def get_number(self):
Processor.number += 1
return Processor.number
def __init__(self, workq):
self.number = self.get_number()
self.workq = workq
super().__init__()
self._run = Event()
self._shut = Event()
self.state = IDLE
log.debug('created %s' % self)
def _do_work(self):
w = self.workq.pop()
if w:
target, args, kwds = w
log.debug('CPU %d starting "%s(%s)"' % (self.number,
target.__name__, ', '.join(str(x) for x in args)))
try:
results = target(*args, **kwds)
except Exception() as e:
log.exception('error on "%s"' % target.__name__)
else:
self.shut()
def run(self):
self.state = RUN
log.debug('%s' % self)
while not self._shut.is_set():
if self._run.is_set():
self.state = PAUSED
log.debug('%s' % self)
while self._run.is_set():
self._run.wait(timeout=0.5)
log.debug('%s' % self)
self.state = RUN
else:
self._do_work()
self.state = SHUT
log.debug('%s' % self)
def pause(self, wait=False):
log.debug('CPU %d - pausing ' % (self.number))
if not self._run.is_set():
self._run.set()
if wait:
log.debug('.pause waiting %s' % self)
self.wait(PAUSED)
def resume(self):
log.debug('CPU %d - resuming' % (self.number))
if self._run.is_set():
self._run.clear()
def shut(self, wait=False):
log.debug('CPU %d - shutting down' % (self.number))
if not self._shut.is_set():
self._shut.set()
if self.state is PAUSED:
self.resume()
if wait:
log.debug('.shut waiting %s' % self)
self.wait(SHUT)
def wait(self, state):
assert state in CPU_STATES
while self.state not in (state, SHUT):
log.debug('wait cpu %s for %s' % (state, self))
time.sleep(0.5)
def __str__(self):
return 'CPU %d - state: %s' % (self.number, self.state)
if __name__ == '__main__':
import pickle # to serialize suites...
# mock test case as "callables"
def test_case(name, counter):
while counter > 0:
print ('%s: counter= %d' % (name, counter))
counter -= 1
time.sleep(0.2)
# mock test suite as WorkQ"
class TestSuite(WorkQ):
def __init__(self, name):
super().__init__()
self.name = name
def __str__(self):
return 'TestSuite(%s), left: %d' % (self.name, self.size)
# build two test suite
suites = [ TestSuite('suite-%d' % x) for x in range(2) ]
for s in suites:
for _ in range(3):
s.push(test_case, ('%s, case: %d' % (s.name, _), 5))
log.info('creating processors')
cpu = [ Processor(w) for w in suites ]
for _ in cpu: _.start()
# our victim
p = cpu[0]
log.info('pausing cpu 0')
p.pause(wait=True)
# fetch workq
wq = p.workq
log.info('dumping state: %s' % wq.name)
wq_state = pickle.dumps(wq)
log.info('shutting down CPU %d' % p.number)
p.shut(wait=True)
cpu.remove(p)
log.info('pausing %s' % wq.name)
count = 100 # show that other tasks are
delay = 2.0 / count # running while we're pausing.
while count > 0:
time.sleep(delay)
count -= 1
log.info('reloading state')
wq = pickle.loads(wq_state)
log.info('reloaded state: %s' % wq.name)
log.info('creating new CPU/thread')
p = Processor(wq)
cpu.append(p)
p.start()
log.info('restarting...')
p.wait(RUN)
log.info('waiting completion')
for _ in cpu: _.join() |
Partager