1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
| """
Communication with embedded board
"""
import pexpect
import time
import sys
import threading
import Queue
import subprocess
import telnetlib
from cStringIO import StringIO
class Dispatcher():
def __init__(self):
self.msg_callbacks = {}
def register_callback(self, command, callback):
if (command) not in self.msg_callbacks:
self.msg_callbacks[command] = []
self.msg_callbacks [command].append((callback))
def unregister_callback(self, command, callback):
self.msg_callbacks[command].remove((callback))
class Disconnected(BaseException):
pass
class Timeout(BaseException):
pass
class MXComm():
def __init__(self):
self.is_connected = False
self._tn_to_gui = Queue.Queue()
self._gui_to_tn = Queue.Queue()
self._tx_item = None
def _connect(self):
pass
def _receive(self):
pass
def _send(self, command):
pass
def connect(self):
self._connect()
self.is_connected = True
def try_send_data(self):
try:
#get an item from the queue, try to send it
#if the item wasn't sent, we don't get another one
if self._tx_item is None:
self._tx_item = self._gui_to_tn.get_nowait()
self._send(self._tx_item)
self._tx_item = None #everything's ok, item sent
except Queue.Empty:
pass
except Timeout:
pass
except Disconnected:
self.is_connected = False
#get data from telnet session to gui
def try_get_data(self):
try:
rx_item = self._receive()
self._tn_to_gui.put_nowait(rx_item)
except Timeout:
pass
except Disconnected:
self.is_connected = False
#class ModOpenOCD(MXComm):
# def __init__(self):
# MXComm.__init__(self)
#
# def _connect(self):
# try:
# password = ""
# openocd = pexpect.spawn('sudo openocd -f %s' % (file))
# openocd.sendline(password)
# except:
# if'timeout error':
# print 'JTAG Connection error:'
# raise Disconnected()
# else:
# raise Timeout()
# #from the openocd session
# def _receive(self):
# #get the stop command
# data = raw_input()
# return data
# #send to gui
# def _send(self, data):
# #send command line result
# data = openocd.after
# return data
class ModTelnet(MXComm):
def __init__(self):
MXComm.__init__(self)
self._tn = None
def _connect(self):
#connect to telnet session @ localhost port 4444
try:
HOST = "localhost"
port = "4444"
#self._tn = telnetlib.Telnet(HOST, port)
#tn.open(HOST, port)
self._tn = pexpect.spawn('telnet')
i = self._tn.expect('telnet>', 3)
if i == 0:
print 'connected'
elif i != 0 :
print 'connection error'
except:
print "Connection refused"
def _receive(self):
#receive data (= msg) from telnet stdout
try:
saveout = sys.stdout
outfile = open('output.txt', 'w')
sys.stdout = outfile
outfile.flush()
data = outfile.readline()
outfile.close
sys.stdout = saveout
return data
except:
print 'Connection error:'
raise Disconnected()
def _send(self, command):
#send command to telnet session
try:
self._tn.send('help')
except :
print 'Connection error:'
raise Disconnected()
class MODiMX27(threading.Thread):
"""
Provides communication with a MDDiMX27 board
It's using pexpect, openOCD and telnet to detect and communicate with the board.
The class creates a thread with two queue to read/write data asynchronously.
"""
def __init__(self, connection):
threading.Thread.__init__(self)
self._conn = connection
self._dispatch = Dispatcher()
self._is_running = True
self.start()
def __del__(self):
self.stop()
if self.isAlive():
self.join()
def stop(self):
self._is_running = False
def read_packet(self):
try:
return self._get_packet()
except Queue.Empty:
return
def register_callbacks(self, callback_array):
for command, callback in callback_array:
self._dispatch.register_callback(command, callback)
def unregister_callbacks(self, callback_array):
for command, callback in callback_array:
self._dispatch.unregister_callback(command, callback)
def _get_packet(self):
""" Get a message from telnet or the openOCD session (non blocking call),
raise queue empty if no message available """
return self._conn._tn_to_gui.get_nowait()
#self._conn.openocd_to_gui.get_nowait())
def send_packet(self, command):
"""Send a message to the telnet or Openocd session (non-blocking call)"""
self._conn._gui_to_tn.put_nowait(command)
def is_connected(self):
return self._conn.is_connected
def run(self):
"""Thread main: read and write alternatively"""
while self._is_running:
if not self._conn.is_connected:
try:
self._conn.connect()
except :
time.sleep(1.0)
else:
#-------- PC -> IMX -----
self._conn.try_send_data()
#-------- IMX -> PC -----
self._conn.try_get_data()
if __name__ =='__main__':
try:
tn = ModTelnet()
#connect to telnet session
board = MODiMX27(tn)
except:
print 'Cannot connect to MODiMX27!'
exit(0) |
Partager