# -*- coding: utf-8 -*- # use Python threads # spot V2 and V3 import logging ## if __name__ == '__main__': ## logging.basicConfig(level=logging.DEBUG) log = logging.getLogger('sample') import sys PyVER = (sys.version_info.major, sys.version_info.minor) PyV3 = PyVER >= (3, 1) PyV2 = PyVER == (2, 7) assert not (PyV2 and PyV3), 'fatal: unsupported version %s' % '.'.join(int(x) for x in PyVER) log.debug('PyV2=%s, PyV3=%s' % (PyV2, PyV3)) del PyVER import threading import inspect import ctypes threading.all_done = threading.Event() class Thread(threading.Thread): """add ._async_raise and .terminate methods """ def _async_raise(self, type_): if not issubclass(type_, BaseException): raise TypeError('Only "Exception" types can be raised (no instance)') SetAsyncExc = ctypes.pythonapi.PyThreadState_SetAsyncExc rc = SetAsyncExc( ctypes.c_long(self.ident), ctypes.py_object(type_) ) if rc == 0: raise ValueError("invalid thread id") elif rc != 1: # if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect SetAsyncExc(ctypes.c_long(self.ident), None) raise SystemError("PyThreadState_SetAsyncExc failed") def terminate(self): log.debug('thread.terminate') if self.is_alive(): self._async_raise(SystemExit) import threading if PyV3: import queue elif PyV2: import Queue as queue class Queue(queue.Queue): def get(self): interval = 0.1 all_done = threading.all_done while True: try: rs = queue.Queue.get(self, timeout=interval) except queue.Empty: if all_done.is_set(): raise SystemExit else: break return rs import sys import code if PyV3: from io import StringIO elif PyV2: from io import BytesIO as StringIO class Interpreter(code.InteractiveConsole): _thread = None # don't recycle sio def runcode(self, code): """redirect outputs to StringIO""" std_sav = sys.stdout, sys.stderr sys.stdout = sys.stderr = sio = StringIO() sio.write = self.write try: exec (code, self.locals) except SystemExit: self.quit() except: self.showtraceback() finally: sio.close() sys.stdout, sys.stderr = std_sav def __init__(self, proxy, locals=None, filename=""): code.InteractiveConsole.__init__(self, locals, filename) self._inputQ = Queue(maxsize=1) self.post = proxy.post proxy.set_interpreter(self) # backlink for 'interpret_line' def interpret_line(self, line): log.debug('interpreter.interpret_line, /%s/' % line) self._inputQ.put(line) def write(self, data): log.debug('interpreter.write') self.post('write', data) def raw_input(self, prompt): log.debug('interpreter.raw_input') self.post('raw_input', prompt) data = self._inputQ.get() return data def quit(self): log.debug('interpreter.quit') self.post('quit') def interact(self, banner=None): """save current thread to make .abort happy""" self._thread = threading.current_thread() code.InteractiveConsole.interact(self, banner) log.debug('*** interact done') def abort(self): log.debug('interpreter.abort') tp = self._thread assert tp is not None tp._async_raise(KeyboardInterrupt) from PyQt4.QtCore import (Qt, QObject, QEvent, QCoreApplication) class AsyncCommand(QEvent): def __init__(self, func, args): QEvent.__init__(self, QEvent.User) self.data = (func, args) def __call__(self): func, args = self.data try: rs = func(*args) except Exception as e: log.exception('error calling %s\%s' % (func.__name__, e)) class QtProxy(QObject): def set_interpreter(self, obj): self._widget._interpreter = obj def __init__(self, widget): QObject.__init__(self) self._widget = widget self._postEvent = QCoreApplication.instance().postEvent def post(self, name, *args): log.debug('post: thread.ident = %d' % threading.current_thread().ident) log.debug('post(%s, ...)' % name) postEvent = self._postEvent w = self._widget command = AsyncCommand(getattr(w, name), args) postEvent(self, command) def event(self, command): log.debug('event: thread.ident = %d' % threading.current_thread().ident) if isinstance(command, AsyncCommand): command() return True else: return QObject.event(self, event) from PyQt4.QtCore import (Qt, pyqtSlot) from PyQt4.QtGui import (QTextEdit, QTextCursor, QApplication) class TextEdit(QTextEdit): _cursor = None _interpreter = None @property def cursor(self): if self._cursor is None: self._cursor = QTextCursor(self.document()) return self._cursor def _moveToEnd(self): self.cursor.movePosition(QTextCursor.End, QTextCursor.MoveAnchor) @pyqtSlot(str) def raw_input(self, prompt): log.debug('console.raw_input') self.write(prompt) self._start = self.cursor.position() @pyqtSlot(str) def write(self, data): log.debug('console.write, data=/%s/' % data) [ self.append((line)) for line in data.split('\n') if line ] self._moveToEnd() @pyqtSlot(str) def quit(self): log.debug('console.quit') self.close() def keyPressEvent(self, event): if event.key() in (Qt.Key_Return, Qt.Key_Enter): log.debug('console._on_eol') # maybe there is a clever way to get line to send cursor = self.cursor self._moveToEnd() eol = cursor.position() count = eol - self._start cursor.movePosition(QTextCursor.Left, QTextCursor.KeepAnchor, count) line = cursor.selection().toPlainText() cursor.movePosition(QTextCursor.Right, QTextCursor.KeepAnchor, count) if PyV2: line = unicode(line) if len(line) else unicode('') log.debug('interpret line: /%s/' % line) self._interpreter.interpret_line(line) elif event.key()==Qt.Key_C and ( event.modifiers() & Qt.ControlModifier): log.debug('widget.control-C') self._interpreter.abort() else: return QTextEdit.keyPressEvent(self, event) if __name__ == '__main__': app = QApplication(sys.argv) # setup widget, proxy, then console textEdit = TextEdit() proxy = QtProxy(textEdit) interpreter = Interpreter(proxy) # setup QThread task = Thread(target=interpreter.interact) textEdit.show() # show display task.start() # start interpreter app.exec_() # start event loop log.debug('terminating task') task.terminate() task.join() log.debug('exit')