Merge "Add E2AP package that supports asn1 encoding/decoding function for E2AP."
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 import time
26 from threading import Thread
27 from typing import List, Set
28
29 import inotify_simple
30 from mdclogpy import Logger
31
32 from ricxappframe import xapp_rmr
33 from ricxappframe.constants import sdl_namespaces
34 from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
35 from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
36 from ricxappframe.rmr import rmr
37 from ricxappframe.xapp_sdl import SDLWrapper
38 import requests
39 # message-type constants
40 RIC_HEALTH_CHECK_REQ = 100
41 RIC_HEALTH_CHECK_RESP = 101
42
43 # environment variable with path to configuration file
44 CONFIG_FILE_ENV = "CONFIG_FILE"
45 CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
46
47
48 class _BaseXapp:
49     """
50     This class initializes RMR, starts a thread that checks for incoming
51     messages, provisions an SDL object and optionally creates a
52     config-file watcher.  This private base class should not be
53     instantiated by clients directly, but it defines many public methods
54     that may be used by clients.
55
56     If environment variable CONFIG_FILE is defined, and that variable
57     contains a path to an existing file, a watcher is defined to monitor
58     modifications (writes) to that file using the Linux kernel's inotify
59     feature. The watcher must be polled by calling method
60     config_check().
61
62     Parameters
63     ----------
64     rmr_port: int (optional, default is 4562)
65         Port on which the RMR library listens for incoming messages.
66
67     rmr_wait_for_ready: bool (optional, default is True)
68         If this is True, then init waits until RMR is ready to send,
69         which includes having a valid routing file. This can be set
70         to False if the client wants to *receive only*.
71
72     use_fake_sdl: bool (optional, default is False)
73         if this is True, it uses the DBaaS "fake dict backend" instead
74         of Redis or other backends. Set this to True when developing
75         an xapp or during unit testing to eliminate the need for DBaaS.
76
77     post_init: function (optional, default is None)
78         Runs this user-provided function at the end of the init method;
79         its signature should be post_init(self)
80     """
81
82     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
83         """
84         Documented in the class comment.
85         """
86         # PUBLIC, can be used by xapps using self.(name):
87         self.logger = Logger(name=__name__)
88
89         # Start rmr rcv thread
90         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
91         self._mrc = self._rmr_loop.mrc  # for convenience
92
93         # SDL
94         self.sdl = SDLWrapper(use_fake_sdl)
95
96         # Config
97         # The environment variable specifies the path to the Xapp config file
98         self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
99         if self._config_path and os.path.isfile(self._config_path):
100             self._inotify = inotify_simple.INotify()
101             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
102             self.logger.debug("__init__: watching config file {}".format(self._config_path))
103         else:
104             self._inotify = None
105             self.logger.warning("__init__: NOT watching any config file")
106
107         # used for thread control of Registration of Xapp
108         self._keep_registration = True
109
110         # configuration data  for xapp registration and deregistration
111         self._config_data = None
112         self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
113         if self._configfile_path and os.path.isfile(self._configfile_path):
114             with open(self._configfile_path) as json_file:
115                 self._config_data = json.load(json_file)
116         else:
117             self._keep_registration = False
118             self.logger.warning("__init__: Cannot Read config file for xapp Registration")
119
120         Thread(target=self.registerXapp).start()
121
122         # run the optionally provided user post init
123         if post_init:
124             post_init(self)
125
126     def get_service(self, host, service):
127         """
128         To find the url for connecting to the service
129
130         Parameters
131         ----------
132         host: string
133             defines the hostname in the url
134         service: string
135             defines the servicename in the url
136
137         Returns
138         -------
139         string
140             url for the service
141         """
142         app_namespace = self._config_data.get("APP_NAMESPACE")
143         if app_namespace == "":
144             app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
145         svc = service.format(app_namespace.upper(), host.upper())
146         url = svc.replace("-", "_").split("//")
147
148         if len(url) > 1:
149             return url[1]
150         return ""
151
152     def do_post(self, plt_namespace, url, msg):
153         """
154         registration of the xapp using the url and json msg
155
156         Parameters
157         ----------
158         plt_namespace: string
159             platform namespace where the xapp is running
160         url: string
161             url for xapp registration
162         msg: string
163             json msg containing the xapp details
164
165         Returns
166         -------
167         bool
168             whether or not the xapp is registered
169         """
170         try:
171             request_url = url.format(plt_namespace, plt_namespace)
172             resp = requests.post(request_url, json=msg)
173             self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
174             return resp.status_code == 200 or resp.status_code == 201
175         except requests.exceptions.RequestException as err:
176             self.logger.error("Error : {}".format(err))
177             return format(err)
178         except requests.exceptions.HTTPError as errh:
179             self.logger.error("Http Error: {}".format(errh))
180             return errh
181         except requests.exceptions.ConnectionError as errc:
182             self.logger.error("Error Connecting: {}".format(errc))
183             return errc
184         except requests.exceptions.Timeout as errt:
185             self.logger.error("Timeout Error: {}".format(errt))
186             return errt
187
188     def register(self):
189         """
190             function to registers the xapp
191
192         Returns
193         -------
194         bool
195             whether or not the xapp is registered
196         """
197         hostname = self._config_data.get("hostname")
198         xappname = self._config_data.get("name")
199         xappversion = self._config_data.get("version")
200         pltnamespace = self._config_data.get("PLT_NAMESPACE")
201         if pltnamespace == "":
202             pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
203         http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
204         rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
205         if http_endpoint == "" or rmr_endpoint == "":
206             self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
207             return None
208         try:
209             request_string = {
210                 "appName": hostname,
211                 "httpEndpoint": http_endpoint,
212                 "rmrEndpoint": rmr_endpoint,
213                 "appInstanceName": xappname,
214                 "appVersion": xappversion,
215                 "configPath": self._config_data.get("CONFIG_PATH")
216             }
217             request_body = json.dumps(request_string)
218         except TypeError:
219             self.logger.error("Unable to serialize the object")
220             return "Error searializing the object"
221
222         return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
223
224     def registerXapp(self):
225         """
226             registers the xapp
227         """
228         while self._keep_registration:
229             time.sleep(5)
230             # checking for rmr/sdl/xapp health
231             healthy = self.healthcheck()
232             if not healthy:
233                 self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
234                 continue
235
236             self.logger.debug("Application='{}'  is now up and ready, continue with registration ...".format(self._config_data.get("name")))
237             self.register()
238             self.logger.debug("Registration done, proceeding with startup ...")
239             break
240
241     def deregister(self):
242         """
243             Deregisters the xapp
244
245         Returns
246         -------
247         bool
248             whether or not the xapp is registered
249         """
250         healthy = self.healthcheck()
251         if not healthy:
252             self.logger.error("RMR or SDL or xapp == Not Healthy")
253             return None
254         if self._config_data is None:
255             return None
256         name = self._config_data.get("hostname")
257         xappname = self._config_data.get("name")
258         pltnamespace = self._config_data.get("PLT_NAMESPACE")
259         if pltnamespace == "":
260             pltnamespace = self._config_data.get("PLT_NAMESPACE")
261         try:
262             request_string = {
263                 "appName": name,
264                 "appInstanceName": xappname,
265             }
266             request_body = json.dumps(request_string)
267         except TypeError:
268             self.logger.error("Error Serializing the object")
269             return "Error serializing the object"
270
271         return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
272
273     def xapp_shutdown(self):
274         """
275              Deregisters the xapp while shutting down
276         """
277         self.deregister()
278         self.logger.debug("Wait for xapp to get unregistered")
279         time.sleep(10)
280
281     # Public rmr methods
282
283     def rmr_get_messages(self):
284         """
285         Returns a generator iterable over all items in the queue that
286         have not yet been read by the client xapp. Each item is a tuple
287         (S, sbuf) where S is a message summary dict and sbuf is the raw
288         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
289         finished with each sbuf to prevent memory leaks!
290         """
291         while not self._rmr_loop.rcv_queue.empty():
292             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
293             yield (summary, sbuf)
294
295     def rmr_send(self, payload, mtype, retries=100):
296         """
297         Allocates a buffer, sets payload and mtype, and sends
298
299         Parameters
300         ----------
301         payload: bytes
302             payload to set
303         mtype: int
304             message type
305         retries: int (optional)
306             Number of times to retry at the application level before excepting RMRFailure
307
308         Returns
309         -------
310         bool
311             whether or not the send worked after retries attempts
312         """
313         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
314
315         for _ in range(retries):
316             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
317             if sbuf.contents.state == 0:
318                 self.rmr_free(sbuf)
319                 return True
320
321         self.rmr_free(sbuf)
322         return False
323
324     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
325         """
326         Allows the xapp to return to sender, possibly adjusting the
327         payload and message type before doing so.  This does NOT free
328         the sbuf for the caller as the caller may wish to perform
329         multiple rts per buffer. The client needs to free.
330
331         Parameters
332         ----------
333         sbuf: ctypes c_void_p
334              Pointer to an rmr message buffer
335         new_payload: bytes (optional)
336             New payload to set
337         new_mtype: int (optional)
338             New message type (replaces the received message)
339         retries: int (optional, default 100)
340             Number of times to retry at the application level
341
342         Returns
343         -------
344         bool
345             whether or not the send worked after retries attempts
346         """
347         for _ in range(retries):
348             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
349             if sbuf.contents.state == 0:
350                 return True
351
352         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
353         return False
354
355     def rmr_free(self, sbuf):
356         """
357         Frees an rmr message buffer after use
358
359         Note: this does not need to be a class method, self is not
360         used. However if we break it out as a function we need a home
361         for it.
362
363         Parameters
364         ----------
365         sbuf: ctypes c_void_p
366              Pointer to an rmr message buffer
367         """
368         rmr.rmr_free_msg(sbuf)
369
370     # Convenience (pass-thru) function for invoking SDL.
371
372     def sdl_set(self, namespace, key, value, usemsgpack=True):
373         """
374         ** Deprecate Warning **
375         ** Will be removed in a future function **
376
377         Stores a key-value pair to SDL, optionally serializing the value
378         to bytes using msgpack.
379
380         Parameters
381         ----------
382         namespace: string
383             SDL namespace
384         key: string
385             SDL key
386         value:
387             Object or byte array to store.  See the `usemsgpack` parameter.
388         usemsgpack: boolean (optional, default is True)
389             Determines whether the value is serialized using msgpack before storing.
390             If usemsgpack is True, the msgpack function `packb` is invoked
391             on the value to yield a byte array that is then sent to SDL.
392             Stated differently, if usemsgpack is True, the value can be anything
393             that is serializable by msgpack.
394             If usemsgpack is False, the value must be bytes.
395         """
396         self.sdl.set(namespace, key, value, usemsgpack)
397
398     def sdl_get(self, namespace, key, usemsgpack=True):
399         """
400         ** Deprecate Warning **
401         ** Will be removed in a future function **
402
403         Gets the value for the specified namespace and key from SDL,
404         optionally deserializing stored bytes using msgpack.
405
406         Parameters
407         ----------
408         namespace: string
409             SDL namespace
410         key: string
411             SDL key
412         usemsgpack: boolean (optional, default is True)
413             If usemsgpack is True, the byte array stored by SDL is deserialized
414             using msgpack to yield the original object that was stored.
415             If usemsgpack is False, the byte array stored by SDL is returned
416             without further processing.
417
418         Returns
419         -------
420         Value
421             See the usemsgpack parameter for an explanation of the returned value type.
422             Answers None if the key is not found.
423         """
424         return self.sdl.get(namespace, key, usemsgpack)
425
426     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
427         """
428         ** Deprecate Warning **
429         ** Will be removed in a future function **
430
431         Gets all key-value pairs in the specified namespace
432         with keys that start with the specified prefix,
433         optionally deserializing stored bytes using msgpack.
434
435         Parameters
436         ----------
437         nnamespaces: string
438            SDL namespace
439         prefix: string
440             the key prefix
441         usemsgpack: boolean (optional, default is True)
442             If usemsgpack is True, the byte array stored by SDL is deserialized
443             using msgpack to yield the original value that was stored.
444             If usemsgpack is False, the byte array stored by SDL is returned
445             without further processing.
446
447         Returns
448         -------
449         Dictionary of key-value pairs
450             Each key has the specified prefix.
451             The value object (its type) depends on the usemsgpack parameter,
452             but is either a Python object or raw bytes as discussed above.
453             Answers an empty dictionary if no keys matched the prefix.
454         """
455         return self.sdl.find_and_get(namespace, prefix, usemsgpack)
456
457     def sdl_delete(self, namespace, key):
458         """
459         ** Deprecate Warning **
460         ** Will be removed in a future function **
461
462         Deletes the key-value pair with the specified key in the specified namespace.
463
464         Parameters
465         ----------
466         namespace: string
467            SDL namespace
468         key: string
469             SDL key
470         """
471         self.sdl.delete(namespace, key)
472
473     def _get_rnib_info(self, node_type):
474         """
475         Since the difference between get_list_gnb_ids and get_list_enb_ids is only note-type,
476         this function extracted from the duplicated logic.
477
478         Parameters
479         ----------
480         node_type: string
481            Type of node. This is EnumDescriptor.
482            Available node types
483            - UNKNOWN
484            - ENG
485            - GNB
486
487         Returns
488         -------
489             List: (NbIdentity)
490
491         Raises
492         -------
493             SdlTypeError: If function's argument is of an inappropriate type.
494             NotConnected: If SDL is not connected to the backend data storage.
495             RejectedByBackend: If backend data storage rejects the request.
496             BackendError: If the backend data storage fails to process the request.
497         """
498         nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
499         ret: List[NbIdentity] = []
500         for nbid_string in nbid_strings:
501             nbid = NbIdentity()
502             nbid.ParseFromString(nbid_string)
503             ret.append(nbid)
504         return ret
505
506     def get_list_gnb_ids(self):
507         """
508         Retrieves the list of gNodeb identity entities
509
510         gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
511         is stored in SDL's `e2Manager` namespace as protobuf serialized.
512
513         Returns
514         -------
515             List: (NbIdentity)
516
517         Raises
518         -------
519             SdlTypeError: If function's argument is of an inappropriate type.
520             NotConnected: If SDL is not connected to the backend data storage.
521             RejectedByBackend: If backend data storage rejects the request.
522             BackendError: If the backend data storage fails to process the request.
523         """
524         return self._get_rnib_info(Node.Type.Name(Node.Type.GNB))
525
526     def get_list_enb_ids(self):
527         """
528         Retrieves the list of eNodeb identity entities
529
530         eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
531         is stored in SDL's `e2Manager` namespace as protobuf serialized.
532
533         Returns
534         -------
535             List: (NbIdentity)
536
537         Raises
538         ------
539             SdlTypeError: If function's argument is of an inappropriate type.
540             NotConnected: If SDL is not connected to the backend data storage.
541             RejectedByBackend: If backend data storage rejects the request.
542             BackendError: If the backend data storage fails to process the request.
543         """
544         return self._get_rnib_info(Node.Type.Name(Node.Type.ENB))
545
546     # Health
547
548     def healthcheck(self):
549         """
550         this needs to be understood how this is supposed to work
551         """
552         return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
553
554     # Convenience function for discovering config change events
555
556     def config_check(self, timeout=0):
557         """
558         Checks the watcher for configuration-file events. The watcher
559         prerequisites and event mask are documented in __init__().
560
561         Parameters
562         ----------
563         timeout: int (optional)
564             Number of seconds to wait for a configuration-file event, default 0.
565
566         Returns
567         -------
568         List of Events, possibly empty
569             An event is a tuple with objects wd, mask, cookie and name.
570             For example::
571
572                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
573
574         """
575         if not self._inotify:
576             return []
577         events = self._inotify.read(timeout=timeout)
578         return list(events)
579
580     def stop(self):
581         """
582         cleans up and stops the xapp rmr thread (currently). This is
583         critical for unit testing as pytest will never return if the
584         thread is running.
585
586         TODO: can we register a ctrl-c handler so this gets called on
587         ctrl-c? Because currently two ctrl-c are needed to stop.
588         """
589
590         self.xapp_shutdown()
591
592         self._rmr_loop.stop()
593
594
595 # Public classes that Xapp writers should instantiate or subclass
596 # to implement an Xapp.
597
598
599 class RMRXapp(_BaseXapp):
600     """
601     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
602     only performs an action when a message is received.  Clients should
603     invoke the run method, which has a loop that waits for RMR messages
604     and calls the appropriate client-registered consume callback on each.
605
606     If environment variable CONFIG_FILE is defined, and that variable
607     contains a path to an existing file, this class polls a watcher
608     defined on that file to detect file-write events, and invokes a
609     configuration-change handler on each event. The handler is also
610     invoked at startup.  If no handler function is supplied to the
611     constructor, this class defines a default handler that only logs a
612     message.
613
614     Parameters
615     ----------
616     default_handler: function
617         A function with the signature (summary, sbuf) to be called when a
618         message type is received for which no other handler is registered.
619     default_handler argument summary: dict
620         The RMR message summary, a dict of key-value pairs
621     default_handler argument sbuf: ctypes c_void_p
622         Pointer to an RMR message buffer. The user must call free on this when done.
623     config_handler: function (optional, default is documented above)
624         A function with the signature (json) to be called at startup and each time
625         a configuration-file change event is detected. The JSON object is read from
626         the configuration file, if the prerequisites are met.
627     config_handler argument json: dict
628         The contents of the configuration file, parsed as JSON.
629     rmr_port: integer (optional, default is 4562)
630         Initialize RMR to listen on this port
631     rmr_wait_for_ready: boolean (optional, default is True)
632         Wait for RMR to signal ready before starting the dispatch loop
633     use_fake_sdl: boolean (optional, default is False)
634         Use an in-memory store instead of the real SDL service
635     post_init: function (optional, default None)
636         Run this function after the app initializes and before the dispatch loop starts;
637         its signature should be post_init(self)
638     """
639
640     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
641         """
642         Also see _BaseXapp
643         """
644         # init base
645         super().__init__(
646             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
647         )
648
649         # setup callbacks
650         self._default_handler = default_handler
651         self._config_handler = config_handler
652         self._dispatch = {}
653
654         # used for thread control
655         self._keep_going = True
656
657         # register a default healthcheck handler
658         # this default checks that rmr is working and SDL is working
659         # the user can override this and register their own handler
660         # if they wish since the "last registered callback wins".
661         def handle_healthcheck(self, summary, sbuf):
662             healthy = self.healthcheck()
663             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
664             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
665             self.rmr_free(sbuf)
666
667         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
668
669         # define a default configuration-change handler if none was provided.
670         if not config_handler:
671             def handle_config_change(self, config):
672                 self.logger.debug("xapp_frame: default config handler invoked")
673             self._config_handler = handle_config_change
674
675         # call the config handler at startup if prereqs were met
676         if self._inotify:
677             with open(self._config_path) as json_file:
678                 data = json.load(json_file)
679             self.logger.debug("run: invoking config handler at start")
680             self._config_handler(self, data)
681
682     def register_callback(self, handler, message_type):
683         """
684         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
685
686         Parameters
687         ----------
688         handler: function
689             a function with the signature (summary, sbuf) to be called
690             when a message of type message_type is received
691         summary: dict
692             the rmr message summary
693         sbuf: ctypes c_void_p
694             Pointer to an rmr message buffer. The user must call free on this when done.
695
696         message:type: int
697             the message type to look for
698
699         Note if this method is called multiple times for a single message type, the "last one wins".
700         """
701         self._dispatch[message_type] = handler
702
703     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
704         """
705         This function should be called when the reactive Xapp is ready to start.
706         After start, the Xapp's handlers will be called on received messages.
707
708         Parameters
709         ----------
710         thread: bool (optional, default is False)
711             If False, execution is not returned and the framework loops forever.
712             If True, a thread is started to run the queue read/dispatch loop
713             and execution is returned to caller; the thread can be stopped
714             by calling the .stop() method.
715
716         rmr_timeout: integer (optional, default is 5 seconds)
717             Length of time to wait for an RMR message to arrive.
718
719         inotify_timeout: integer (optional, default is 0 seconds)
720             Length of time to wait for an inotify event to arrive.
721         """
722
723         def loop():
724             while self._keep_going:
725
726                 # poll RMR
727                 try:
728                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
729                     # dispatch
730                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
731                     if not func:
732                         func = self._default_handler
733                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
734                     func(self, summary, sbuf)
735                 except queue.Empty:
736                     # the get timed out
737                     pass
738
739                 # poll configuration file watcher
740                 try:
741                     events = self.config_check(timeout=inotify_timeout)
742                     for event in events:
743                         with open(self._config_path) as json_file:
744                             data = json.load(json_file)
745                         self.logger.debug("run: invoking config handler on change event {}".format(event))
746                         self._config_handler(self, data)
747                 except Exception as error:
748                     self.logger.error("run: configuration handler failed: {}".format(error))
749
750         if thread:
751             Thread(target=loop).start()
752         else:
753             loop()
754
755     def stop(self):
756         """
757         Sets the flag to end the dispatch loop.
758         """
759         super().stop()
760         self.logger.debug("Setting flag to end framework work loop.")
761         self._keep_going = False
762
763
764 class Xapp(_BaseXapp):
765     """
766     Represents a generic Xapp where the client provides a single function
767     for the framework to call at startup time (instead of providing callback
768     functions by message type). The Xapp writer must implement and provide a
769     function with a loop-forever construct similar to the `run` function in
770     the `RMRXapp` class.  That function should poll to retrieve RMR messages
771     and dispatch them appropriately, poll for configuration changes, etc.
772
773     Parameters
774     ----------
775     entrypoint: function
776         This function is called when the Xapp class's run method is invoked.
777         The function signature must be just function(self)
778     rmr_port: integer (optional, default is 4562)
779         Initialize RMR to listen on this port
780     rmr_wait_for_ready: boolean (optional, default is True)
781         Wait for RMR to signal ready before starting the dispatch loop
782     use_fake_sdl: boolean (optional, default is False)
783         Use an in-memory store instead of the real SDL service
784     """
785
786     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
787         """
788         Parameters
789         ----------
790
791         For the other parameters, see class _BaseXapp.
792         """
793         # init base
794         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
795         self._entrypoint = entrypoint
796
797     def run(self):
798         """
799         This function should be called when the general Xapp is ready to start.
800         """
801         self._entrypoint(self)
802
803     # there is no need for stop currently here (base has, and nothing
804     # special to do here)