1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
# use Qt4 threads
# spot V2 and V3
import logging
## if __name__ == '__main__':
## logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('sample')
import sys
PyVER = (sys.version_info.major, sys.version_info.minor)
PyV3 = PyVER >= (3, 1)
PyV2 = PyVER == (2, 7)
assert not (PyV2 and PyV3), 'fatal: unsupported version %s' % '.'.join(int(x) for x in PyVER)
del PyVER
from PyQt4.QtCore import (QObject, pyqtSignal, QCoreApplication, QThread)
import sys
import code
if PyV3:
from io import StringIO
from queue import Queue
elif PyV2:
from io import BytesIO as StringIO
from Queue import Queue
class Interpreter(code.InteractiveConsole, QObject):
# don't recycle sio
def runcode(self, code):
"""redirect outputs to StringIO"""
std_sav = sys.stdout, sys.stderr
sys.stdout = sys.stderr = sio = StringIO()
sio.write = self.write
try:
exec (code, self.locals)
except SystemExit:
self.quit()
except:
self.showtraceback()
finally:
sio.close()
sys.stdout, sys.stderr = std_sav
on_write = pyqtSignal(str)
on_raw_input = pyqtSignal(str)
on_quit = pyqtSignal()
def __init__(self, textEdit, locals=None, filename="<console>"):
code.InteractiveConsole.__init__(self, locals, filename)
QObject.__init__(self)
self._inputQ = Queue(maxsize=1)
# weaving
self.on_write.connect(textEdit.write)
self.on_raw_input.connect(textEdit.raw_input)
self.on_quit.connect(textEdit.quit)
textEdit._interpreter = self
def interpret_line(self, line):
log.debug('proxy.interpret_line, /%s/' % line)
self._inputQ.put(line)
def write(self, data):
log.debug('proxy.write')
self.on_write.emit(data)
def raw_input(self, prompt):
log.debug('proxy.raw_input')
self.on_raw_input.emit(prompt)
data = self._inputQ.get()
return data
def quit(self):
log.debug('proxy.quit')
self.on_quit.emit()
def interact(self, banner=None):
me = QThread.currentThread()
app = QCoreApplication.instance().thread()
log.debug('interact on thread: %s' % me)
log.debug('QApp on thread: %s' % app)
assert me is not app
code.InteractiveConsole.interact(self, banner) #PyV2&V3
from PyQt4.QtCore import (Qt, pyqtSlot)
from PyQt4.QtGui import (QTextEdit, QTextCursor, QApplication)
class TextEdit(QTextEdit):
_cursor = None
_interpreter = None
@property
def cursor(self):
if self._cursor is None:
self._cursor = QTextCursor(self.document())
return self._cursor
def _moveToEnd(self):
self.cursor.movePosition(QTextCursor.End, QTextCursor.MoveAnchor)
@pyqtSlot(str)
def raw_input(self, prompt):
log.debug('console.raw_input')
self.write(prompt)
self._start = self.cursor.position()
@pyqtSlot(str)
def write(self, data):
log.debug('console.write, data=/%s/' % data)
[ self.append((line)) for line in data.split('\n') if line ]
self._moveToEnd()
@pyqtSlot(str)
def quit(self):
log.debug('console.quit')
self.close()
def keyPressEvent(self, event):
if event.key() in (Qt.Key_Return, Qt.Key_Enter):
log.debug('console._on_eol')
# maybe there is a clever way to get line to send
cursor = self.cursor
self._moveToEnd()
eol = cursor.position()
count = eol - self._start
cursor.movePosition(QTextCursor.Left, QTextCursor.KeepAnchor, count)
line = cursor.selection().toPlainText()
cursor.movePosition(QTextCursor.Right, QTextCursor.KeepAnchor, count)
if PyV2:
line = unicode(line) if len(line) else u''
log.debug('interpret line: /%s/' % line)
self._interpreter.interpret_line(line)
else:
QTextEdit.keyPressEvent(self, event)
if __name__ == '__main__':
app = QApplication(sys.argv)
# setup console & proxy
textEdit = TextEdit()
interpreter = Interpreter(textEdit)
# setup QThread
task = QThread()
task.run = interpreter.interact # mock run
interpreter.moveToThread(task) # move QObject to thread
textEdit.show() # show display
task.start() # start interpreter
app.exec_() # start event loop
log.debug('terminating task')
task.terminate()
task.wait()
log.debug('exit') |