Bonjour,
Pour commencer je débute sur python et j'utilise en ce moment une biblio qui s'appelle BACpypes pour le protocole BACnet.
Donc je voudrais créer un serveur qui utilise les commandes read property (en gros pour lire une valeur) et qui envoie des requêtes I Am
La fonction de Read je l'import depuis un fichier ReadPropertyAny.py et la fonction IAm du fichier WhoIsIAm.py
En gros je veux importer les fonctions do_read et do_iam de ReadpropertuAny.py et WhoIsIAm.py respectivement et les utiliser depuis une console.
Donc le ReadPropertyAny.py
Le WhoIsIAm.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177 #!/usr/bin/python """ This application presents a 'console' prompt to the user asking for read commands which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK and prints the value. """ import sys from bacpypes.debugging import bacpypes_debugging, ModuleLogger from bacpypes.consolelogging import ConfigArgumentParser from bacpypes.consolecmd import ConsoleCmd from bacpypes.core import run from bacpypes.pdu import Address from bacpypes.app import LocalDeviceObject, BIPSimpleApplication from bacpypes.object import get_object_class, get_datatype from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK from bacpypes.primitivedata import Tag from bacpypes.constructeddata import Array from bacpypes.basetypes import ServicesSupported # some debugging _debug = 0 _log = ModuleLogger(globals()) # globals this_device = None this_application = None this_console = None # # ReadPropertyAnyApplication # class ReadPropertyAnyApplication(BIPSimpleApplication): def __init__(self, *args): if _debug: ReadPropertyAnyApplication._debug("__init__ %r", args) BIPSimpleApplication.__init__(self, *args) # keep track of requests to line up responses self._request = None def request(self, apdu): if _debug: ReadPropertyAnyApplication._debug("request %r", apdu) # save a copy of the request self._request = apdu # forward it along BIPSimpleApplication.request(self, apdu) def confirmation(self, apdu): if _debug: ReadPropertyAnyApplication._debug("confirmation %r", apdu) if isinstance(apdu, Error): sys.stdout.write("error: %s\n" % (apdu.errorCode,)) sys.stdout.flush() elif isinstance(apdu, AbortPDU): apdu.debug_contents() elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): # peek at the value tag value_tag = apdu.propertyValue.tagList.Peek() if _debug: ReadPropertyAnyApplication._debug(" - value_tag: %r", value_tag) # make sure that it is application tagged if value_tag.tagClass != Tag.applicationTagClass: sys.stdout.write("value is not application encoded\n") else: # find the datatype datatype = Tag._app_tag_class[value_tag.tagNumber] if _debug: ReadPropertyAnyApplication._debug(" - datatype: %r", datatype) if not datatype: raise TypeError, "unknown datatype" # cast out the value value = apdu.propertyValue.cast_out(datatype) if _debug: ReadPropertyAnyApplication._debug(" - value: %r", value) sys.stdout.write(str(value) + '\n') sys.stdout.flush() bacpypes_debugging(ReadPropertyAnyApplication) # # ReadPropertyAnyConsoleCmd # class ReadPropertyAnyConsoleCmd(ConsoleCmd): def do_read(self, args): """read <addr> <type> <inst> <prop> [ <indx> ]""" args = args.split() if _debug: ReadPropertyAnyConsoleCmd._debug("do_read %r", args) try: addr, obj_type, obj_inst, prop_id = args[:4] if obj_type.isdigit(): obj_type = int(obj_type) elif not get_object_class(obj_type): raise ValueError, "unknown object type" obj_inst = int(obj_inst) if prop_id.isdigit(): prop_id = int(prop_id) # build a request request = ReadPropertyRequest( objectIdentifier=(obj_type, obj_inst), propertyIdentifier=prop_id, ) request.pduDestination = Address(addr) if len(args) == 5: request.propertyArrayIndex = int(args[4]) if _debug: ReadPropertyAnyConsoleCmd._debug(" - request: %r", request) # give it to the application this_application.request(request) except Exception, e: ReadPropertyAnyConsoleCmd._exception("exception: %r", e) bacpypes_debugging(ReadPropertyAnyConsoleCmd) # # __main__ # try: # parse the command line arguments args = ConfigArgumentParser(description=__doc__).parse_args() if _debug: _log.debug("initialization") if _debug: _log.debug(" - args: %r", args) # make a device object this_device = LocalDeviceObject( objectName=args.ini.objectname, objectIdentifier=int(args.ini.objectidentifier), maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), segmentationSupported=args.ini.segmentationsupported, vendorIdentifier=int(args.ini.vendoridentifier), ) # make a simple application this_application = ReadPropertyAnyApplication(this_device, args.ini.address) # get the services supported services_supported = this_application.get_services_supported() if _debug: _log.debug(" - services_supported: %r", services_supported) # let the device object know this_device.protocolServicesSupported = services_supported.value # make a console this_console = ReadPropertyAnyConsoleCmd() _log.debug("running") run() except Exception, e: _log.exception("an error has occurred: %s", e) finally: _log.debug("finally")
Et voilà ce que moi j'ai fais le BacnetServerbrouillon.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 #!/usr/bin/python """ This application presents a 'console' prompt to the user asking for Who-Is and I-Am commands which create the related APDUs, then lines up the coorresponding I-Am for incoming traffic and prints out the contents. """ import sys from bacpypes.debugging import bacpypes_debugging, ModuleLogger from bacpypes.consolelogging import ConfigArgumentParser from bacpypes.consolecmd import ConsoleCmd from bacpypes.core import run from bacpypes.pdu import Address, GlobalBroadcast from bacpypes.app import LocalDeviceObject, BIPSimpleApplication from bacpypes.apdu import WhoIsRequest, IAmRequest from bacpypes.basetypes import ServicesSupported from bacpypes.errors import DecodingError # some debugging _debug = 0 _log = ModuleLogger(globals()) # globals this_device = None this_application = None this_console = None # # WhoIsIAmApplication # class WhoIsIAmApplication(BIPSimpleApplication): def __init__(self, *args): if _debug: WhoIsIAmApplication._debug("__init__ %r", args) BIPSimpleApplication.__init__(self, *args) # keep track of requests to line up responses self._request = None def request(self, apdu): if _debug: WhoIsIAmApplication._debug("request %r", apdu) # save a copy of the request self._request = apdu # forward it along BIPSimpleApplication.request(self, apdu) def confirmation(self, apdu): if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu) # forward it along BIPSimpleApplication.confirmation(self, apdu) def indication(self, apdu): if _debug: WhoIsIAmApplication._debug("indication %r", apdu) if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)): device_type, device_instance = apdu.iAmDeviceIdentifier if device_type != 'device': raise DecodingError, "invalid object type" if (self._request.deviceInstanceRangeLowLimit is not None) and \ (device_instance < self._request.deviceInstanceRangeLowLimit): pass elif (self._request.deviceInstanceRangeHighLimit is not None) and \ (device_instance > self._request.deviceInstanceRangeHighLimit): pass else: # print out the contents sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n') sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n') sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n') sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n') sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n') sys.stdout.flush() # forward it along BIPSimpleApplication.indication(self, apdu) bacpypes_debugging(WhoIsIAmApplication) # # WhoIsIAmConsoleCmd # class WhoIsIAmConsoleCmd(ConsoleCmd): def do_whois(self, args): """whois [ <addr>] [ <lolimit> <hilimit> ]""" args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args) try: # build a request request = WhoIsRequest() if (len(args) == 1) or (len(args) == 3): request.pduDestination = Address(args[0]) del args[0] else: request.pduDestination = GlobalBroadcast() if len(args) == 2: request.deviceInstanceRangeLowLimit = int(args[0]) request.deviceInstanceRangeHighLimit = int(args[1]) if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) # give it to the application this_application.request(request) except Exception, e: WhoIsIAmConsoleCmd._exception("exception: %r", e) def do_iam(self, args): """iam""" args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args) try: # build a request request = IAmRequest() request.pduDestination = GlobalBroadcast() # set the parameters from the device object request.iAmDeviceIdentifier = this_device.objectIdentifier request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted request.segmentationSupported = this_device.segmentationSupported request.vendorID = this_device.vendorIdentifier if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) # give it to the application this_application.request(request) except Exception, e: WhoIsIAmConsoleCmd._exception("exception: %r", e) def do_rtn(self, args): """rtn <addr> <net> ... """ args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args) # safe to assume only one adapter adapter = this_application.nsap.adapters[0] if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter) # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point this_application.nsap.add_router_references(adapter, router_address, network_list) bacpypes_debugging(WhoIsIAmConsoleCmd) # # __main__ # try: # parse the command line arguments args = ConfigArgumentParser(description=__doc__).parse_args() if _debug: _log.debug("initialization") if _debug: _log.debug(" - args: %r", args) # make a device object this_device = LocalDeviceObject( objectName=args.ini.objectname, objectIdentifier=int(args.ini.objectidentifier), maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), segmentationSupported=args.ini.segmentationsupported, vendorIdentifier=int(args.ini.vendoridentifier), ) # build a bit string that knows about the bit names pss = ServicesSupported() pss['whoIs'] = 1 pss['iAm'] = 1 pss['readProperty'] = 1 pss['writeProperty'] = 1 # set the property value to be just the bits this_device.protocolServicesSupported = pss.value # make a simple application this_application = WhoIsIAmApplication(this_device, args.ini.address) # get the services supported services_supported = this_application.get_services_supported() if _debug: _log.debug(" - services_supported: %r", services_supported) # let the device object know this_device.protocolServicesSupported = services_supported.value # make a console this_console = WhoIsIAmConsoleCmd() _log.debug("running") run() except Exception, e: _log.exception("an error has occurred: %s", e) finally: _log.debug("finally")
Mais ça ne marche pas il me renvoie une erreur "Address already in use" et ceci parce que j'utilise les deux "WhoIs" et "Read" alors que si j'utilise seulement la Who Is il me renvoie 'Nontype" has no attribute 'objectIdentifier'
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 from bacpypes.consolelogging import ConfigArgumentParser from bacpypes.debugging import bacpypes_debugging, ModuleLogger from bacpypes.app import LocalDeviceObject from bacpypes.core import run from ReadPropertyAny import ReadPropertyAnyApplication, ReadPropertyAnyConsoleCmd from WhoIsIAm import WhoIsIAmApplication, WhoIsIAmConsoleCmd from WhoIsRouter import WhoIsRouterApplication, WhoIsRouterConsoleCmd from bacpypes.basetypes import ServicesSupported _debug=0 _log= ModuleLogger(globals()) # # WhoIsIAm # class IAmapp(WhoIsIAmApplication): def __init__(self, *args): if _debug: _log.debug('IAmapp :%r', IAmapp) WhoIsIAmApplication.__init__(self, *args) bacpypes_debugging(IAmapp) class IAmc(WhoIsIAmConsoleCmd): def __init__(self, *args): if _debug: _log.debug('IAmc :%r', IAmc) WhoIsIAmConsoleCmd.__init__(self, *args) bacpypes_debugging(IAmc) class Readapp(ReadPropertyAnyApplication): def __init__(self, *args): if _debug: _log.debug('Readapp :%r', Readapp) ReadPropertyAnyApplication.__init__(self,*args) bacpypes_debugging(Readapp) class Readc(ReadPropertyAnyApplication): def __init__(self, *args): if _debug: _log.debug('Readc :%r', Readc) ReadPropertyAnyConsoleCmd.__init__(self,*args) bacpypes_debugging(Readc) # # __main__ # try: # parse the command line arguments args = ConfigArgumentParser(description=__doc__).parse_args() if _debug: _log.debug("initialization") if _debug: _log.debug(" - args: %r", args) # make a device object this_device = LocalDeviceObject( objectName=args.ini.objectname, objectIdentifier=int(args.ini.objectidentifier), maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), segmentationSupported=args.ini.segmentationsupported, vendorIdentifier=int(args.ini.vendoridentifier), ) pss = ServicesSupported() pss['whoIs'] = 1 pss['iAm'] = 1 pss['readProperty'] = 1 pss['writeProperty'] = 1 pss['utcTimeSynchronization'] = 1 #Make Applications iam=IAmapp(this_device, args.ini.address) read=Readapp(this_device, args.ini.address) #Get the survice supported services_supported1 = iam.get_services_supported() services_supported2 = read.get_services_supported() if _debug: _log.debug(" - services_supported: %r %r",services_supported1, services_supported2) _log.debug("running") Iam=IAmc() Read=Readc() run() except Exception, e: _log.exception("an error has occurred: %s", e) finally: _log.debug("finally")
Mais je pense que le problème c'est que je m'y prends trop mal pour importer et utiliser ces classes.
Alors si quelqu'un à le temps et surtout le courage de lire et de m'aider j'en serais très reconnaissant.
Bonne journée.
Partager