1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
26 from threading import Thread
27 from typing import List, Set
30 from mdclogpy import Logger
32 from ricxappframe import xapp_rmr
33 from ricxappframe.constants import sdl_namespaces
35 import ricxappframe.entities.rnib.nodeb_info_pb2 as pb_nbi
36 import ricxappframe.entities.rnib.cell_pb2 as pb_cell
37 from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
38 from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
40 from ricxappframe.rmr import rmr
41 from ricxappframe.util.constants import Constants
42 from ricxappframe.xapp_sdl import SDLWrapper
48 This class initializes RMR, starts a thread that checks for incoming
49 messages, provisions an SDL object and optionally creates a
50 config-file watcher. This private base class should not be
51 instantiated by clients directly, but it defines many public methods
52 that may be used by clients.
54 If environment variable CONFIG_FILE is defined, and that variable
55 contains a path to an existing file, a watcher is defined to monitor
56 modifications (writes) to that file using the Linux kernel's inotify
57 feature. The watcher must be polled by calling method
62 rmr_port: int (optional, default is 4562)
63 Port on which the RMR library listens for incoming messages.
65 rmr_wait_for_ready: bool (optional, default is True)
66 If this is True, then init waits until RMR is ready to send,
67 which includes having a valid routing file. This can be set
68 to False if the client wants to *receive only*.
70 use_fake_sdl: bool (optional, default is False)
71 if this is True, it uses the DBaaS "fake dict backend" instead
72 of Redis or other backends. Set this to True when developing
73 an xapp or during unit testing to eliminate the need for DBaaS.
75 post_init: function (optional, default is None)
76 Runs this user-provided function at the end of the init method;
77 its signature should be post_init(self)
80 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
82 Documented in the class comment.
84 # PUBLIC, can be used by xapps using self.(name):
85 self.logger = Logger(name=__name__)
86 self._appthread = None
88 # Start rmr rcv thread
89 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
90 self._mrc = self._rmr_loop.mrc # for convenience
93 self.sdl = SDLWrapper(use_fake_sdl)
96 # The environment variable specifies the path to the Xapp config file
97 self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
98 if self._config_path and os.path.isfile(self._config_path):
99 self._inotify = inotify_simple.INotify()
100 self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
101 self.logger.debug("__init__: watching config file {}".format(self._config_path))
104 self.logger.warning("__init__: NOT watching any config file")
106 # used for thread control of Registration of Xapp
107 self._keep_registration = True
109 # configuration data for xapp registration and deregistration
110 self._config_data = None
111 if self._config_path and os.path.isfile(self._config_path):
112 with open(self._config_path) as json_file:
113 self._config_data = json.load(json_file)
115 self._keep_registration = False
116 self.logger.error("__init__: Cannot Read config file for xapp Registration")
117 self._config_data = {}
119 self._appthread = Thread(target=self.registerXapp).start()
121 # run the optionally provided user post init
125 def get_service(self, host, service):
127 To find the url for connecting to the service
132 defines the hostname in the url
134 defines the servicename in the url
141 app_namespace = self._config_data.get("APP_NAMESPACE")
142 if app_namespace is None:
143 app_namespace = Constants.DEFAULT_XAPP_NS
144 self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
145 if app_namespace is not None and host is not None:
146 svc = service.format(app_namespace.upper(), host.upper())
147 urlkey = svc.replace("-", "_")
148 url = os.environ.get(urlkey).split("//")
149 self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
154 def do_post(self, plt_namespace, url, msg):
156 registration of the xapp using the url and json msg
160 plt_namespace: string
161 platform namespace where the xapp is running
163 url for xapp registration
165 json msg containing the xapp details
170 whether or not the xapp is registered
173 self.logger.error("url is empty ")
175 if plt_namespace is None:
176 self.logger.error("plt_namespace is empty")
179 request_url = url.format(plt_namespace, plt_namespace)
180 resp = requests.post(request_url, json=msg)
181 self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
182 self.logger.debug("Response Text : {}".format(resp.text))
183 return resp.status_code == 200 or resp.status_code == 201
184 except requests.exceptions.RequestException as err:
185 self.logger.error("Error : {}".format(err))
187 except requests.exceptions.HTTPError as errh:
188 self.logger.error("Http Error: {}".format(errh))
190 except requests.exceptions.ConnectionError as errc:
191 self.logger.error("Error Connecting: {}".format(errc))
193 except requests.exceptions.Timeout as errt:
194 self.logger.error("Timeout Error: {}".format(errt))
199 function to registers the xapp
204 whether or not the xapp is registered
206 hostname = os.environ.get("HOSTNAME")
207 xappname = self._config_data.get("name")
208 xappversion = self._config_data.get("version")
209 pltnamespace = os.environ.get("PLT_NAMESPACE")
210 if pltnamespace is None:
211 pltnamespace = Constants.DEFAULT_PLT_NS
212 self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
213 hostname, xappname, xappversion, pltnamespace))
215 http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
216 rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
217 if http_endpoint == "" or rmr_endpoint == "":
219 "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
223 "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
224 ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
225 self._config_data.get("CONFIG_PATH")))
228 "appVersion": xappversion,
230 "appInstanceName": xappname,
231 "httpEndpoint": http_endpoint,
232 "rmrEndpoint": rmr_endpoint,
233 "config": json.dumps(self._config_data)
235 self.logger.info("REQUEST STRING :{}".format(request_string))
236 return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
238 def registerXapp(self):
243 while self._keep_registration and retries > 0:
246 # checking for rmr/sdl/xapp health
247 healthy = self.healthcheck()
250 "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
253 self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(
254 self._config_data.get("name")))
256 self.logger.debug("Registration done, proceeding with startup ...")
259 def deregister(self):
266 whether or not the xapp is registered
268 healthy = self.healthcheck()
270 self.logger.error("RMR or SDL or xapp == Not Healthy")
272 if self._config_data is None:
274 name = os.environ.get("HOSTNAME")
275 xappname = self._config_data.get("name")
276 pltnamespace = os.environ.get("PLT_NAMESPACE")
277 if pltnamespace is None:
278 pltnamespace = Constants.DEFAULT_PLT_NS
281 "appInstanceName": xappname,
284 return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
286 def xapp_shutdown(self):
288 Deregisters the xapp while shutting down
291 self.logger.debug("Wait for xapp to get unregistered")
296 def rmr_get_messages(self):
298 Returns a generator iterable over all items in the queue that
299 have not yet been read by the client xapp. Each item is a tuple
300 (S, sbuf) where S is a message summary dict and sbuf is the raw
301 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
302 finished with each sbuf to prevent memory leaks!
304 while not self._rmr_loop.rcv_queue.empty():
305 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
306 yield (summary, sbuf)
308 def rmr_send(self, payload, mtype, retries=100):
310 Allocates a buffer, sets payload and mtype, and sends
318 retries: int (optional)
319 Number of times to retry at the application level before excepting RMRFailure
324 whether or not the send worked after retries attempts
326 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
329 for _ in range(retries):
330 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
331 if sbuf.contents.state == 0:
338 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
340 Allows the xapp to return to sender, possibly adjusting the
341 payload and message type before doing so. This does NOT free
342 the sbuf for the caller as the caller may wish to perform
343 multiple rts per buffer. The client needs to free.
347 sbuf: ctypes c_void_p
348 Pointer to an rmr message buffer
349 new_payload: bytes (optional)
351 new_mtype: int (optional)
352 New message type (replaces the received message)
353 retries: int (optional, default 100)
354 Number of times to retry at the application level
359 whether or not the send worked after retries attempts
361 for _ in range(retries):
362 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
363 if sbuf.contents.state == 0:
366 self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
369 def rmr_free(self, sbuf):
371 Frees an rmr message buffer after use
373 Note: this does not need to be a class method, self is not
374 used. However if we break it out as a function we need a home
379 sbuf: ctypes c_void_p
380 Pointer to an rmr message buffer
382 rmr.rmr_free_msg(sbuf)
384 # Convenience (pass-thru) function for invoking SDL.
386 def sdl_set(self, namespace, key, value, usemsgpack=True):
388 ** Deprecate Warning **
389 ** Will be removed in a future function **
391 Stores a key-value pair to SDL, optionally serializing the value
392 to bytes using msgpack.
401 Object or byte array to store. See the `usemsgpack` parameter.
402 usemsgpack: boolean (optional, default is True)
403 Determines whether the value is serialized using msgpack before storing.
404 If usemsgpack is True, the msgpack function `packb` is invoked
405 on the value to yield a byte array that is then sent to SDL.
406 Stated differently, if usemsgpack is True, the value can be anything
407 that is serializable by msgpack.
408 If usemsgpack is False, the value must be bytes.
410 self.sdl.set(namespace, key, value, usemsgpack)
412 def sdl_get(self, namespace, key, usemsgpack=True):
414 ** Deprecate Warning **
415 ** Will be removed in a future function **
417 Gets the value for the specified namespace and key from SDL,
418 optionally deserializing stored bytes using msgpack.
426 usemsgpack: boolean (optional, default is True)
427 If usemsgpack is True, the byte array stored by SDL is deserialized
428 using msgpack to yield the original object that was stored.
429 If usemsgpack is False, the byte array stored by SDL is returned
430 without further processing.
435 See the usemsgpack parameter for an explanation of the returned value type.
436 Answers None if the key is not found.
438 return self.sdl.get(namespace, key, usemsgpack)
440 def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
442 ** Deprecate Warning **
443 ** Will be removed in a future function **
445 Gets all key-value pairs in the specified namespace
446 with keys that start with the specified prefix,
447 optionally deserializing stored bytes using msgpack.
455 usemsgpack: boolean (optional, default is True)
456 If usemsgpack is True, the byte array stored by SDL is deserialized
457 using msgpack to yield the original value that was stored.
458 If usemsgpack is False, the byte array stored by SDL is returned
459 without further processing.
463 Dictionary of key-value pairs
464 Each key has the specified prefix.
465 The value object (its type) depends on the usemsgpack parameter,
466 but is either a Python object or raw bytes as discussed above.
467 Answers an empty dictionary if no keys matched the prefix.
469 return self.sdl.find_and_get(namespace, prefix, usemsgpack)
471 def sdl_delete(self, namespace, key):
473 ** Deprecate Warning **
474 ** Will be removed in a future function **
476 Deletes the key-value pair with the specified key in the specified namespace.
485 self.sdl.delete(namespace, key)
487 def _get_rnib_info(self, node_type):
489 Since the difference between get_list_gnb_ids and get_list_enb_ids is only node-type,
490 this function extracted from the duplicated logic.
495 Type of node. This is EnumDescriptor.
507 SdlTypeError: If function's argument is of an inappropriate type.
508 NotConnected: If SDL is not connected to the backend data storage.
509 RejectedByBackend: If backend data storage rejects the request.
510 BackendError: If the backend data storage fails to process the request.
512 nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
513 ret: List[NbIdentity] = []
514 for nbid_string in nbid_strings:
516 nbid.ParseFromString(nbid_string)
520 def get_list_gnb_ids(self):
522 Retrieves the list of gNodeb identity entities
524 gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
525 is stored in SDL's `e2Manager` namespace as protobuf serialized.
533 SdlTypeError: If function's argument is of an inappropriate type.
534 NotConnected: If SDL is not connected to the backend data storage.
535 RejectedByBackend: If backend data storage rejects the request.
536 BackendError: If the backend data storage fails to process the request.
538 return self._get_rnib_info(Node.Type.Name(Node.GNB))
540 def get_list_enb_ids(self):
542 Retrieves the list of eNodeb identity entities
544 eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
545 is stored in SDL's `e2Manager` namespace as protobuf serialized.
553 SdlTypeError: If function's argument is of an inappropriate type.
554 NotConnected: If SDL is not connected to the backend data storage.
555 RejectedByBackend: If backend data storage rejects the request.
556 BackendError: If the backend data storage fails to process the request.
558 return self._get_rnib_info(Node.Type.Name(Node.ENB))
561 Following RNIB methods are made to be inline of the go-lang based RNIB methods.
562 Method names are same as in repository:
563 gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/rnib
565 def GetNodeb(self, inventoryName):
568 In RNIB SDL key is defined following way: RAN:<inventoryName>
572 inventoryName: string
580 SdlTypeError: If function's argument is of an inappropriate type.
581 NotConnected: If SDL is not connected to the backend data storage.
582 RejectedByBackend: If backend data storage rejects the request.
583 BackendError: If the backend data storage fails to process the request.
585 nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'RAN:' + inventoryName, usemsgpack=False)
586 if nbid_string is not None:
587 nbinfo = pb_nbi.NodebInfo()
588 nbinfo.ParseFromString(nbid_string)
592 def GetNodebByGlobalNbId(self, nodeType, plmnId, nbId):
594 Returns nodeb identity based on type, plmn id and node id
595 In RNIB SDL key is defined following way: <nodeType>:<plmnId>:<nbId>
609 SdlTypeError: If function's argument is of an inappropriate type.
610 NotConnected: If SDL is not connected to the backend data storage.
611 RejectedByBackend: If backend data storage rejects the request.
612 BackendError: If the backend data storage fails to process the request.
614 nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, nodeType + ':' + plmnId + ':' + nbId, usemsgpack=False)
615 if nbid_string is not None:
617 nbid.ParseFromString(nbid_string)
621 def GetCellList(self, inventoryName):
623 Returns nodeb served cell list from the saved node data
624 In RNIB SDL key is defined following way: RAN:<inventoryName>
634 ServedCellInfo() in case of ENB
635 ServedNRCell() in case of GNB
639 SdlTypeError: If function's argument is of an inappropriate type.
640 NotConnected: If SDL is not connected to the backend data storage.
641 RejectedByBackend: If backend data storage rejects the request.
642 BackendError: If the backend data storage fails to process the request.
644 nodeb = self.GetNodeb(inventoryName)
645 if nodeb is not None:
646 if nodeb.HasField('enb'):
647 return nodeb.enb.served_cells
648 elif nodeb.HasField('gnb'):
649 return nodeb.gnb.served_nr_cells
652 def GetCellById(self, cell_type, cell_id):
654 Returns cell info by cell type and id.
655 In RNIB SDL keys are defined based on the cell type:
656 ENB type CELL:<cell_id>
657 GNB type NRCELL:<cell_id>
672 SdlTypeError: If function's argument is of an inappropriate type.
673 NotConnected: If SDL is not connected to the backend data storage.
674 RejectedByBackend: If backend data storage rejects the request.
675 BackendError: If the backend data storage fails to process the request.
678 if cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.LTE_CELL):
680 elif cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.NR_CELL):
682 if cellstr is not None:
683 cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, cellstr + ':' + cell_id, usemsgpack=False)
684 if cell_string is not None:
685 cell = pb_cell.Cell()
686 cell.ParseFromString(cell_string)
690 def GetListNodebIds(self):
692 Returns both enb and gnb NbIdentity list
700 SdlTypeError: If function's argument is of an inappropriate type.
701 NotConnected: If SDL is not connected to the backend data storage.
702 RejectedByBackend: If backend data storage rejects the request.
703 BackendError: If the backend data storage fails to process the request.
705 nlist1 = self._get_rnib_info(Node.Type.Name(Node.ENB))
706 nlist2 = self._get_rnib_info(Node.Type.Name(Node.GNB))
712 def GetCell(self, inventoryName, pci):
714 Returns cell info using pci
715 In RNIB SDL key is defined following way: PCI:<inventoryName>:<pci hex val>
719 inventoryName: string
728 SdlTypeError: If function's argument is of an inappropriate type.
729 NotConnected: If SDL is not connected to the backend data storage.
730 RejectedByBackend: If backend data storage rejects the request.
731 BackendError: If the backend data storage fails to process the request.
733 cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'PCI:{0:s}:{1:02x}'.format(inventoryName, pci), usemsgpack=False)
734 if cell_string is not None:
735 cell = pb_cell.Cell()
736 cell.ParseFromString(cell_string)
740 def GetRanFunctionDefinition(self, inventoryName, ran_function_oid):
742 Returns GNB ran function definition list based on the ran_function_oid
743 In RNIB SDL key is defined following way: RAN:<inventoryName>
747 inventoryName: string
748 ran_function_oid: int
752 array of ran_function_definition matching to ran_function_oid
756 SdlTypeError: If function's argument is of an inappropriate type.
757 NotConnected: If SDL is not connected to the backend data storage.
758 RejectedByBackend: If backend data storage rejects the request.
759 BackendError: If the backend data storage fails to process the request.
761 nodeb = self.GetNodeb(inventoryName)
762 if nodeb is not None:
763 if nodeb.HasField('gnb') and nodeb.gnb.ran_functions is not None:
765 for rf in nodeb.gnb.ran_functions:
766 if rf.ran_function_oid == ran_function_oid:
767 ranFDList.append(rf.ran_function_definition)
771 def healthcheck(self):
773 this needs to be understood how this is supposed to work
775 return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
777 # Convenience function for discovering config change events
779 def config_check(self, timeout=0):
781 Checks the watcher for configuration-file events. The watcher
782 prerequisites and event mask are documented in __init__().
786 timeout: int (optional)
787 Number of seconds to wait for a configuration-file event, default 0.
791 List of Events, possibly empty
792 An event is a tuple with objects wd, mask, cookie and name.
795 Event(wd=1, mask=1073742080, cookie=0, name='foo')
798 if not self._inotify:
800 events = self._inotify.read(timeout=timeout)
805 cleans up and stops the xapp rmr thread (currently). This is
806 critical for unit testing as pytest will never return if the
809 TODO: can we register a ctrl-c handler so this gets called on
810 ctrl-c? Because currently two ctrl-c are needed to stop.
812 if self._appthread is not None:
813 self._appthread.join()
817 self._rmr_loop.stop()
820 # Public classes that Xapp writers should instantiate or subclass
821 # to implement an Xapp.
824 class RMRXapp(_BaseXapp):
826 Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
827 only performs an action when a message is received. Clients should
828 invoke the run method, which has a loop that waits for RMR messages
829 and calls the appropriate client-registered consume callback on each.
831 If environment variable CONFIG_FILE is defined, and that variable
832 contains a path to an existing file, this class polls a watcher
833 defined on that file to detect file-write events, and invokes a
834 configuration-change handler on each event. The handler is also
835 invoked at startup. If no handler function is supplied to the
836 constructor, this class defines a default handler that only logs a
841 default_handler: function
842 A function with the signature (summary, sbuf) to be called when a
843 message type is received for which no other handler is registered.
844 default_handler argument summary: dict
845 The RMR message summary, a dict of key-value pairs
846 default_handler argument sbuf: ctypes c_void_p
847 Pointer to an RMR message buffer. The user must call free on this when done.
848 config_handler: function (optional, default is documented above)
849 A function with the signature (json) to be called at startup and each time
850 a configuration-file change event is detected. The JSON object is read from
851 the configuration file, if the prerequisites are met.
852 config_handler argument json: dict
853 The contents of the configuration file, parsed as JSON.
854 rmr_port: integer (optional, default is 4562)
855 Initialize RMR to listen on this port
856 rmr_wait_for_ready: boolean (optional, default is True)
857 Wait for RMR to signal ready before starting the dispatch loop
858 use_fake_sdl: boolean (optional, default is False)
859 Use an in-memory store instead of the real SDL service
860 post_init: function (optional, default None)
861 Run this function after the app initializes and before the dispatch loop starts;
862 its signature should be post_init(self)
865 def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
872 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
876 self._default_handler = default_handler
877 self._config_handler = config_handler
880 # used for thread control
881 self._keep_going = True
883 # register a default healthcheck handler
884 # this default checks that rmr is working and SDL is working
885 # the user can override this and register their own handler
886 # if they wish since the "last registered callback wins".
887 def handle_healthcheck(self, summary, sbuf):
888 healthy = self.healthcheck()
889 payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
890 self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
893 self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
895 # define a default configuration-change handler if none was provided.
896 if not config_handler:
897 def handle_config_change(self, config):
898 self.logger.debug("xapp_frame: default config handler invoked")
900 self._config_handler = handle_config_change
902 # call the config handler at startup if prereqs were met
904 with open(self._config_path) as json_file:
905 data = json.load(json_file)
906 self.logger.debug("run: invoking config handler at start")
907 self._config_handler(self, data)
909 def register_callback(self, handler, message_type):
911 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
916 a function with the signature (summary, sbuf) to be called
917 when a message of type message_type is received
919 the rmr message summary
920 sbuf: ctypes c_void_p
921 Pointer to an rmr message buffer. The user must call free on this when done.
924 the message type to look for
926 Note if this method is called multiple times for a single message type, the "last one wins".
928 self._dispatch[message_type] = handler
930 def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
932 This function should be called when the reactive Xapp is ready to start.
933 After start, the Xapp's handlers will be called on received messages.
937 thread: bool (optional, default is False)
938 If False, execution is not returned and the framework loops forever.
939 If True, a thread is started to run the queue read/dispatch loop
940 and execution is returned to caller; the thread can be stopped
941 by calling the .stop() method.
943 rmr_timeout: integer (optional, default is 5 seconds)
944 Length of time to wait for an RMR message to arrive.
946 inotify_timeout: integer (optional, default is 0 seconds)
947 Length of time to wait for an inotify event to arrive.
951 while self._keep_going:
955 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
957 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
959 func = self._default_handler
960 self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
961 func(self, summary, sbuf)
966 # poll configuration file watcher
968 events = self.config_check(timeout=inotify_timeout)
970 with open(self._config_path) as json_file:
971 data = json.load(json_file)
972 self.logger.debug("run: invoking config handler on change event {}".format(event))
973 self._config_handler(self, data)
974 except Exception as error:
975 self.logger.error("run: configuration handler failed: {}".format(error))
978 Thread(target=loop).start()
984 Sets the flag to end the dispatch loop.
987 self.logger.debug("Setting flag to end framework work loop.")
988 self._keep_going = False
991 class Xapp(_BaseXapp):
993 Represents a generic Xapp where the client provides a single function
994 for the framework to call at startup time (instead of providing callback
995 functions by message type). The Xapp writer must implement and provide a
996 function with a loop-forever construct similar to the `run` function in
997 the `RMRXapp` class. That function should poll to retrieve RMR messages
998 and dispatch them appropriately, poll for configuration changes, etc.
1002 entrypoint: function
1003 This function is called when the Xapp class's run method is invoked.
1004 The function signature must be just function(self)
1005 rmr_port: integer (optional, default is 4562)
1006 Initialize RMR to listen on this port
1007 rmr_wait_for_ready: boolean (optional, default is True)
1008 Wait for RMR to signal ready before starting the dispatch loop
1009 use_fake_sdl: boolean (optional, default is False)
1010 Use an in-memory store instead of the real SDL service
1013 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
1018 For the other parameters, see class _BaseXapp.
1021 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
1022 self._entrypoint = entrypoint
1026 This function should be called when the general Xapp is ready to start.
1028 self._entrypoint(self)
1030 # there is no need for stop currently here (base has, and nothing
1031 # special to do here)