Bonjour,

Pour commencer je débute sur python et j'utilise en ce moment une biblio qui s'appelle BACpypes pour le protocole BACnet.

Donc je voudrais créer un serveur qui utilise les commandes read property (en gros pour lire une valeur) et qui envoie des requêtes I Am

La fonction de Read je l'import depuis un fichier ReadPropertyAny.py et la fonction IAm du fichier WhoIsIAm.py

En gros je veux importer les fonctions do_read et do_iam de ReadpropertuAny.py et WhoIsIAm.py respectivement et les utiliser depuis une console.
Donc le ReadPropertyAny.py

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
 
#!/usr/bin/python
 
"""
This application presents a 'console' prompt to the user asking for read commands
which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK
and prints the value.
"""
 
import sys
 
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
 
from bacpypes.core import run
 
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
 
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Tag
from bacpypes.constructeddata import Array
from bacpypes.basetypes import ServicesSupported
 
# some debugging
_debug = 0
_log = ModuleLogger(globals())
 
# globals
this_device = None
this_application = None
this_console = None
 
#
#   ReadPropertyAnyApplication
#
 
class ReadPropertyAnyApplication(BIPSimpleApplication):
 
    def __init__(self, *args):
        if _debug: ReadPropertyAnyApplication._debug("__init__ %r", args)
        BIPSimpleApplication.__init__(self, *args)
 
        # keep track of requests to line up responses
        self._request = None
 
    def request(self, apdu):
        if _debug: ReadPropertyAnyApplication._debug("request %r", apdu)
 
        # save a copy of the request
        self._request = apdu
 
        # forward it along
        BIPSimpleApplication.request(self, apdu)
 
    def confirmation(self, apdu):
        if _debug: ReadPropertyAnyApplication._debug("confirmation %r", apdu)
 
        if isinstance(apdu, Error):
            sys.stdout.write("error: %s\n" % (apdu.errorCode,))
            sys.stdout.flush()
 
        elif isinstance(apdu, AbortPDU):
            apdu.debug_contents()
 
        elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
            # peek at the value tag
            value_tag = apdu.propertyValue.tagList.Peek()
            if _debug: ReadPropertyAnyApplication._debug("    - value_tag: %r", value_tag)
 
            # make sure that it is application tagged
            if value_tag.tagClass != Tag.applicationTagClass:
                sys.stdout.write("value is not application encoded\n")
 
            else:
                # find the datatype
                datatype = Tag._app_tag_class[value_tag.tagNumber]
                if _debug: ReadPropertyAnyApplication._debug("    - datatype: %r", datatype)
                if not datatype:
                    raise TypeError, "unknown datatype"
 
                # cast out the value
                value = apdu.propertyValue.cast_out(datatype)
                if _debug: ReadPropertyAnyApplication._debug("    - value: %r", value)
 
                sys.stdout.write(str(value) + '\n')
 
            sys.stdout.flush()
 
bacpypes_debugging(ReadPropertyAnyApplication)
 
#
#   ReadPropertyAnyConsoleCmd
#
 
class ReadPropertyAnyConsoleCmd(ConsoleCmd):
 
    def do_read(self, args):
        """read <addr> <type> <inst> <prop> [ <indx> ]"""
        args = args.split()
        if _debug: ReadPropertyAnyConsoleCmd._debug("do_read %r", args)
 
        try:
            addr, obj_type, obj_inst, prop_id = args[:4]
 
            if obj_type.isdigit():
                obj_type = int(obj_type)
            elif not get_object_class(obj_type):
                raise ValueError, "unknown object type"
 
            obj_inst = int(obj_inst)
 
            if prop_id.isdigit():
                prop_id = int(prop_id)
 
            # build a request
            request = ReadPropertyRequest(
                objectIdentifier=(obj_type, obj_inst),
                propertyIdentifier=prop_id,
                )
            request.pduDestination = Address(addr)
 
            if len(args) == 5:
                request.propertyArrayIndex = int(args[4])
            if _debug: ReadPropertyAnyConsoleCmd._debug("    - request: %r", request)
 
            # give it to the application
            this_application.request(request)
 
        except Exception, e:
            ReadPropertyAnyConsoleCmd._exception("exception: %r", e)
 
bacpypes_debugging(ReadPropertyAnyConsoleCmd)
 
#
#   __main__
#
 
try:
    # parse the command line arguments
    args = ConfigArgumentParser(description=__doc__).parse_args()
 
    if _debug: _log.debug("initialization")
    if _debug: _log.debug("    - args: %r", args)
 
    # make a device object
    this_device = LocalDeviceObject(
        objectName=args.ini.objectname,
        objectIdentifier=int(args.ini.objectidentifier),
        maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
        segmentationSupported=args.ini.segmentationsupported,
        vendorIdentifier=int(args.ini.vendoridentifier),
        )
 
    # make a simple application
    this_application = ReadPropertyAnyApplication(this_device, args.ini.address)
 
    # get the services supported
    services_supported = this_application.get_services_supported()
    if _debug: _log.debug("    - services_supported: %r", services_supported)
 
    # let the device object know
    this_device.protocolServicesSupported = services_supported.value
 
    # make a console
    this_console = ReadPropertyAnyConsoleCmd()
 
    _log.debug("running")
 
    run()
 
except Exception, e:
    _log.exception("an error has occurred: %s", e)
finally:
    _log.debug("finally")
Le WhoIsIAm.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/python
 
"""
This application presents a 'console' prompt to the user asking for Who-Is and I-Am
commands which create the related APDUs, then lines up the coorresponding I-Am
for incoming traffic and prints out the contents.
"""
 
import sys
 
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
 
from bacpypes.core import run
 
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
 
from bacpypes.apdu import WhoIsRequest, IAmRequest
from bacpypes.basetypes import ServicesSupported
from bacpypes.errors import DecodingError
 
# some debugging
_debug = 0
_log = ModuleLogger(globals())
 
# globals
this_device = None
this_application = None
this_console = None
 
#
#   WhoIsIAmApplication
#
 
class WhoIsIAmApplication(BIPSimpleApplication):
 
    def __init__(self, *args):
        if _debug: WhoIsIAmApplication._debug("__init__ %r", args)
        BIPSimpleApplication.__init__(self, *args)
 
        # keep track of requests to line up responses
        self._request = None
 
    def request(self, apdu):
        if _debug: WhoIsIAmApplication._debug("request %r", apdu)
 
        # save a copy of the request
        self._request = apdu
 
        # forward it along
        BIPSimpleApplication.request(self, apdu)
 
    def confirmation(self, apdu):
        if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu)
 
        # forward it along
        BIPSimpleApplication.confirmation(self, apdu)
 
    def indication(self, apdu):
        if _debug: WhoIsIAmApplication._debug("indication %r", apdu)
 
        if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)):
            device_type, device_instance = apdu.iAmDeviceIdentifier
            if device_type != 'device':
                raise DecodingError, "invalid object type"
 
            if (self._request.deviceInstanceRangeLowLimit is not None) and \
                (device_instance < self._request.deviceInstanceRangeLowLimit):
                pass
            elif (self._request.deviceInstanceRangeHighLimit is not None) and \
                (device_instance > self._request.deviceInstanceRangeHighLimit):
                pass
            else:
                # print out the contents
                sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n')
                sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n')
                sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n')
                sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n')
                sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n')
                sys.stdout.flush()
 
        # forward it along
        BIPSimpleApplication.indication(self, apdu)
 
bacpypes_debugging(WhoIsIAmApplication)
 
#
#   WhoIsIAmConsoleCmd
#
 
class WhoIsIAmConsoleCmd(ConsoleCmd):
 
    def do_whois(self, args):
        """whois [ <addr>] [ <lolimit> <hilimit> ]"""
        args = args.split()
        if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args)
 
        try:
            # build a request
            request = WhoIsRequest()
            if (len(args) == 1) or (len(args) == 3):
                request.pduDestination = Address(args[0])
                del args[0]
            else:
                request.pduDestination = GlobalBroadcast()
 
            if len(args) == 2:
                request.deviceInstanceRangeLowLimit = int(args[0])
                request.deviceInstanceRangeHighLimit = int(args[1])
            if _debug: WhoIsIAmConsoleCmd._debug("    - request: %r", request)
 
            # give it to the application
            this_application.request(request)
 
        except Exception, e:
            WhoIsIAmConsoleCmd._exception("exception: %r", e)
 
    def do_iam(self, args):
        """iam"""
        args = args.split()
        if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args)
 
        try:
            # build a request
            request = IAmRequest()
            request.pduDestination = GlobalBroadcast()
 
            # set the parameters from the device object
            request.iAmDeviceIdentifier = this_device.objectIdentifier
            request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted
            request.segmentationSupported = this_device.segmentationSupported
            request.vendorID = this_device.vendorIdentifier
            if _debug: WhoIsIAmConsoleCmd._debug("    - request: %r", request)
 
            # give it to the application
            this_application.request(request)
 
        except Exception, e:
            WhoIsIAmConsoleCmd._exception("exception: %r", e)
 
    def do_rtn(self, args):
        """rtn <addr> <net> ... """
        args = args.split()
        if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args)
 
        # safe to assume only one adapter
        adapter = this_application.nsap.adapters[0]
        if _debug: WhoIsIAmConsoleCmd._debug("    - adapter: %r", adapter)
 
        # provide the address and a list of network numbers
        router_address = Address(args[0])
        network_list = [int(arg) for arg in args[1:]]
 
        # pass along to the service access point
        this_application.nsap.add_router_references(adapter, router_address, network_list)
 
bacpypes_debugging(WhoIsIAmConsoleCmd)
 
#
#   __main__
#
 
try:
    # parse the command line arguments
    args = ConfigArgumentParser(description=__doc__).parse_args()
 
    if _debug: _log.debug("initialization")
    if _debug: _log.debug("    - args: %r", args)
 
    # make a device object
    this_device = LocalDeviceObject(
        objectName=args.ini.objectname,
        objectIdentifier=int(args.ini.objectidentifier),
        maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
        segmentationSupported=args.ini.segmentationsupported,
        vendorIdentifier=int(args.ini.vendoridentifier),
        )
 
    # build a bit string that knows about the bit names
    pss = ServicesSupported()
    pss['whoIs'] = 1
    pss['iAm'] = 1
    pss['readProperty'] = 1
    pss['writeProperty'] = 1
 
    # set the property value to be just the bits
    this_device.protocolServicesSupported = pss.value
 
    # make a simple application
    this_application = WhoIsIAmApplication(this_device, args.ini.address)
 
    # get the services supported
    services_supported = this_application.get_services_supported()
    if _debug: _log.debug("    - services_supported: %r", services_supported)
 
    # let the device object know
    this_device.protocolServicesSupported = services_supported.value
 
    # make a console
    this_console = WhoIsIAmConsoleCmd()
 
    _log.debug("running")
 
    run()
 
except Exception, e:
    _log.exception("an error has occurred: %s", e)
finally:
    _log.debug("finally")
Et voilà ce que moi j'ai fais le BacnetServerbrouillon.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
 
 
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.app import LocalDeviceObject
from bacpypes.core import run
from ReadPropertyAny import ReadPropertyAnyApplication, ReadPropertyAnyConsoleCmd
from WhoIsIAm import WhoIsIAmApplication, WhoIsIAmConsoleCmd
from WhoIsRouter import WhoIsRouterApplication, WhoIsRouterConsoleCmd
from bacpypes.basetypes import ServicesSupported
 
_debug=0
_log= ModuleLogger(globals())
 
#
# WhoIsIAm
#
 
class IAmapp(WhoIsIAmApplication):
    def __init__(self, *args):
        if _debug: _log.debug('IAmapp :%r', IAmapp)
        WhoIsIAmApplication.__init__(self, *args)
 
bacpypes_debugging(IAmapp)
class IAmc(WhoIsIAmConsoleCmd):
    def __init__(self, *args):
        if _debug: _log.debug('IAmc :%r', IAmc)
        WhoIsIAmConsoleCmd.__init__(self, *args)
bacpypes_debugging(IAmc)
 
class Readapp(ReadPropertyAnyApplication):
    def __init__(self, *args):
        if _debug: _log.debug('Readapp :%r', Readapp)
        ReadPropertyAnyApplication.__init__(self,*args)
bacpypes_debugging(Readapp)
class Readc(ReadPropertyAnyApplication):
    def __init__(self, *args):
        if _debug: _log.debug('Readc :%r', Readc)
        ReadPropertyAnyConsoleCmd.__init__(self,*args)
bacpypes_debugging(Readc)
 
#
#   __main__
#
 
try:
    # parse the command line arguments
    args = ConfigArgumentParser(description=__doc__).parse_args()
 
    if _debug: _log.debug("initialization")
    if _debug: _log.debug("    - args: %r", args)
 
    # make a device object
    this_device = LocalDeviceObject(
        objectName=args.ini.objectname,
        objectIdentifier=int(args.ini.objectidentifier),
        maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
        segmentationSupported=args.ini.segmentationsupported,
        vendorIdentifier=int(args.ini.vendoridentifier),
        )
 
    pss = ServicesSupported()
    pss['whoIs'] = 1
    pss['iAm'] = 1
    pss['readProperty'] = 1
    pss['writeProperty'] = 1
    pss['utcTimeSynchronization'] = 1
 
    #Make Applications
    iam=IAmapp(this_device, args.ini.address)
    read=Readapp(this_device, args.ini.address)
 
    #Get the survice supported
    services_supported1 = iam.get_services_supported()
    services_supported2 = read.get_services_supported()
    if _debug: _log.debug("    - services_supported: %r %r",services_supported1, services_supported2)
 
    _log.debug("running")
 
    Iam=IAmc()
    Read=Readc()
 
    run()
 
except Exception, e:
    _log.exception("an error has occurred: %s", e)
finally:
    _log.debug("finally")
Mais ça ne marche pas il me renvoie une erreur "Address already in use" et ceci parce que j'utilise les deux "WhoIs" et "Read" alors que si j'utilise seulement la Who Is il me renvoie 'Nontype" has no attribute 'objectIdentifier'

Mais je pense que le problème c'est que je m'y prends trop mal pour importer et utiliser ces classes.

Alors si quelqu'un à le temps et surtout le courage de lire et de m'aider j'en serais très reconnaissant.

Bonne journée.