feb5d664ef9cc4a1f8a34a2451d2ad078204b369
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 import time
26 from threading import Thread
27 from typing import List, Set
28
29 import inotify_simple
30 from mdclogpy import Logger
31
32 from ricxappframe import xapp_rmr
33 from ricxappframe.constants import sdl_namespaces
34 from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
35 from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
36 from ricxappframe.rmr import rmr
37 from ricxappframe.util.constants import Constants
38 from ricxappframe.xapp_sdl import SDLWrapper
39 import requests
40
41
42 class _BaseXapp:
43     """
44     This class initializes RMR, starts a thread that checks for incoming
45     messages, provisions an SDL object and optionally creates a
46     config-file watcher.  This private base class should not be
47     instantiated by clients directly, but it defines many public methods
48     that may be used by clients.
49
50     If environment variable CONFIG_FILE is defined, and that variable
51     contains a path to an existing file, a watcher is defined to monitor
52     modifications (writes) to that file using the Linux kernel's inotify
53     feature. The watcher must be polled by calling method
54     config_check().
55
56     Parameters
57     ----------
58     rmr_port: int (optional, default is 4562)
59         Port on which the RMR library listens for incoming messages.
60
61     rmr_wait_for_ready: bool (optional, default is True)
62         If this is True, then init waits until RMR is ready to send,
63         which includes having a valid routing file. This can be set
64         to False if the client wants to *receive only*.
65
66     use_fake_sdl: bool (optional, default is False)
67         if this is True, it uses the DBaaS "fake dict backend" instead
68         of Redis or other backends. Set this to True when developing
69         an xapp or during unit testing to eliminate the need for DBaaS.
70
71     post_init: function (optional, default is None)
72         Runs this user-provided function at the end of the init method;
73         its signature should be post_init(self)
74     """
75
76     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
77         """
78         Documented in the class comment.
79         """
80         # PUBLIC, can be used by xapps using self.(name):
81         self.logger = Logger(name=__name__)
82
83         # Start rmr rcv thread
84         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
85         self._mrc = self._rmr_loop.mrc  # for convenience
86
87         # SDL
88         self.sdl = SDLWrapper(use_fake_sdl)
89
90         # Config
91         # The environment variable specifies the path to the Xapp config file
92         self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
93         if self._config_path and os.path.isfile(self._config_path):
94             self._inotify = inotify_simple.INotify()
95             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
96             self.logger.debug("__init__: watching config file {}".format(self._config_path))
97         else:
98             self._inotify = None
99             self.logger.warning("__init__: NOT watching any config file")
100
101         # used for thread control of Registration of Xapp
102         self._keep_registration = True
103
104         # configuration data  for xapp registration and deregistration
105         self._config_data = None
106         if self._config_path and os.path.isfile(self._config_path):
107             with open(self._config_path) as json_file:
108                 self._config_data = json.load(json_file)
109         else:
110             self._keep_registration = False
111             self.logger.error("__init__: Cannot Read config file for xapp Registration")
112             self._config_data = {}
113
114         Thread(target=self.registerXapp).start()
115
116         # run the optionally provided user post init
117         if post_init:
118             post_init(self)
119
120     def get_service(self, host, service):
121         """
122         To find the url for connecting to the service
123
124         Parameters
125         ----------
126         host: string
127             defines the hostname in the url
128         service: string
129             defines the servicename in the url
130
131         Returns
132         -------
133         string
134             url for the service
135         """
136         app_namespace = self._config_data.get("APP_NAMESPACE")
137         if app_namespace is None:
138             app_namespace = Constants.DEFAULT_XAPP_NS
139         self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
140         if app_namespace is not None and host is not None:
141             svc = service.format(app_namespace.upper(), host.upper())
142             urlkey = svc.replace("-", "_")
143             url = os.environ.get(urlkey).split("//")
144             self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
145             if len(url) > 1:
146                 return url[1]
147         return ""
148
149     def do_post(self, plt_namespace, url, msg):
150         """
151         registration of the xapp using the url and json msg
152
153         Parameters
154         ----------
155         plt_namespace: string
156             platform namespace where the xapp is running
157         url: string
158             url for xapp registration
159         msg: string
160             json msg containing the xapp details
161
162         Returns
163         -------
164         bool
165             whether or not the xapp is registered
166         """
167         if url is None:
168             self.logger.error("url is empty ")
169             return False
170         if plt_namespace is None:
171             self.logger.error("plt_namespace is empty")
172             return False
173         try:
174             request_url = url.format(plt_namespace, plt_namespace)
175             resp = requests.post(request_url, json=msg)
176             self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
177             self.logger.debug("Response Text : {}".format(resp.text))
178             return resp.status_code == 200 or resp.status_code == 201
179         except requests.exceptions.RequestException as err:
180             self.logger.error("Error : {}".format(err))
181             return format(err)
182         except requests.exceptions.HTTPError as errh:
183             self.logger.error("Http Error: {}".format(errh))
184             return errh
185         except requests.exceptions.ConnectionError as errc:
186             self.logger.error("Error Connecting: {}".format(errc))
187             return errc
188         except requests.exceptions.Timeout as errt:
189             self.logger.error("Timeout Error: {}".format(errt))
190             return errt
191
192     def register(self):
193         """
194             function to registers the xapp
195
196         Returns
197         -------
198         bool
199             whether or not the xapp is registered
200         """
201         hostname = os.environ.get("HOSTNAME")
202         xappname = self._config_data.get("name")
203         xappversion = self._config_data.get("version")
204         pltnamespace = os.environ.get("PLT_NAMESPACE")
205         if pltnamespace is None:
206             pltnamespace = Constants.DEFAULT_PLT_NS
207         self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
208             hostname, xappname, xappversion, pltnamespace))
209
210         http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
211         rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
212         if http_endpoint == "" or rmr_endpoint == "":
213             self.logger.error(
214                 "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
215                                                                                               rmr_endpoint))
216             return False
217         self.logger.debug(
218             "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
219             ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
220                                           self._config_data.get("CONFIG_PATH")))
221         request_string = {
222             "appName": hostname,
223             "appVersion": xappversion,
224             "configPath": "",
225             "appInstanceName": xappname,
226             "httpEndpoint": http_endpoint,
227             "rmrEndpoint": rmr_endpoint,
228             "config": json.dumps(self._config_data)
229         }
230         self.logger.info("REQUEST STRING :{}".format(request_string))
231         return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
232
233     def registerXapp(self):
234         """
235             registers the xapp
236         """
237         retries = 5
238         while self._keep_registration and retries > 0:
239             time.sleep(2)
240             retries = retries-1
241             # checking for rmr/sdl/xapp health
242             healthy = self.healthcheck()
243             if not healthy:
244                 self.logger.warning(
245                     "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
246                 continue
247
248             self.logger.debug("Application='{}'  is now up and ready, continue with registration ...".format(
249                 self._config_data.get("name")))
250             if self.register():
251                 self.logger.debug("Registration done, proceeding with startup ...")
252                 break
253
254     def deregister(self):
255         """
256             Deregisters the xapp
257
258         Returns
259         -------
260         bool
261             whether or not the xapp is registered
262         """
263         healthy = self.healthcheck()
264         if not healthy:
265             self.logger.error("RMR or SDL or xapp == Not Healthy")
266             return None
267         if self._config_data is None:
268             return None
269         name = os.environ.get("HOSTNAME")
270         xappname = self._config_data.get("name")
271         pltnamespace = os.environ.get("PLT_NAMESPACE")
272         if pltnamespace is None:
273             pltnamespace = Constants.DEFAULT_PLT_NS
274         request_string = {
275                 "appName": name,
276                 "appInstanceName": xappname,
277         }
278
279         return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
280
281     def xapp_shutdown(self):
282         """
283              Deregisters the xapp while shutting down
284         """
285         self.deregister()
286         self.logger.debug("Wait for xapp to get unregistered")
287         time.sleep(10)
288
289     # Public rmr methods
290
291     def rmr_get_messages(self):
292         """
293         Returns a generator iterable over all items in the queue that
294         have not yet been read by the client xapp. Each item is a tuple
295         (S, sbuf) where S is a message summary dict and sbuf is the raw
296         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
297         finished with each sbuf to prevent memory leaks!
298         """
299         while not self._rmr_loop.rcv_queue.empty():
300             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
301             yield (summary, sbuf)
302
303     def rmr_send(self, payload, mtype, retries=100):
304         """
305         Allocates a buffer, sets payload and mtype, and sends
306
307         Parameters
308         ----------
309         payload: bytes
310             payload to set
311         mtype: int
312             message type
313         retries: int (optional)
314             Number of times to retry at the application level before excepting RMRFailure
315
316         Returns
317         -------
318         bool
319             whether or not the send worked after retries attempts
320         """
321         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
322                                  mtype=mtype)
323
324         for _ in range(retries):
325             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
326             if sbuf.contents.state == 0:
327                 self.rmr_free(sbuf)
328                 return True
329
330         self.rmr_free(sbuf)
331         return False
332
333     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
334         """
335         Allows the xapp to return to sender, possibly adjusting the
336         payload and message type before doing so.  This does NOT free
337         the sbuf for the caller as the caller may wish to perform
338         multiple rts per buffer. The client needs to free.
339
340         Parameters
341         ----------
342         sbuf: ctypes c_void_p
343              Pointer to an rmr message buffer
344         new_payload: bytes (optional)
345             New payload to set
346         new_mtype: int (optional)
347             New message type (replaces the received message)
348         retries: int (optional, default 100)
349             Number of times to retry at the application level
350
351         Returns
352         -------
353         bool
354             whether or not the send worked after retries attempts
355         """
356         for _ in range(retries):
357             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
358             if sbuf.contents.state == 0:
359                 return True
360
361         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
362         return False
363
364     def rmr_free(self, sbuf):
365         """
366         Frees an rmr message buffer after use
367
368         Note: this does not need to be a class method, self is not
369         used. However if we break it out as a function we need a home
370         for it.
371
372         Parameters
373         ----------
374         sbuf: ctypes c_void_p
375              Pointer to an rmr message buffer
376         """
377         rmr.rmr_free_msg(sbuf)
378
379     # Convenience (pass-thru) function for invoking SDL.
380
381     def sdl_set(self, namespace, key, value, usemsgpack=True):
382         """
383         ** Deprecate Warning **
384         ** Will be removed in a future function **
385
386         Stores a key-value pair to SDL, optionally serializing the value
387         to bytes using msgpack.
388
389         Parameters
390         ----------
391         namespace: string
392             SDL namespace
393         key: string
394             SDL key
395         value:
396             Object or byte array to store.  See the `usemsgpack` parameter.
397         usemsgpack: boolean (optional, default is True)
398             Determines whether the value is serialized using msgpack before storing.
399             If usemsgpack is True, the msgpack function `packb` is invoked
400             on the value to yield a byte array that is then sent to SDL.
401             Stated differently, if usemsgpack is True, the value can be anything
402             that is serializable by msgpack.
403             If usemsgpack is False, the value must be bytes.
404         """
405         self.sdl.set(namespace, key, value, usemsgpack)
406
407     def sdl_get(self, namespace, key, usemsgpack=True):
408         """
409         ** Deprecate Warning **
410         ** Will be removed in a future function **
411
412         Gets the value for the specified namespace and key from SDL,
413         optionally deserializing stored bytes using msgpack.
414
415         Parameters
416         ----------
417         namespace: string
418             SDL namespace
419         key: string
420             SDL key
421         usemsgpack: boolean (optional, default is True)
422             If usemsgpack is True, the byte array stored by SDL is deserialized
423             using msgpack to yield the original object that was stored.
424             If usemsgpack is False, the byte array stored by SDL is returned
425             without further processing.
426
427         Returns
428         -------
429         Value
430             See the usemsgpack parameter for an explanation of the returned value type.
431             Answers None if the key is not found.
432         """
433         return self.sdl.get(namespace, key, usemsgpack)
434
435     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
436         """
437         ** Deprecate Warning **
438         ** Will be removed in a future function **
439
440         Gets all key-value pairs in the specified namespace
441         with keys that start with the specified prefix,
442         optionally deserializing stored bytes using msgpack.
443
444         Parameters
445         ----------
446         nnamespaces: string
447            SDL namespace
448         prefix: string
449             the key prefix
450         usemsgpack: boolean (optional, default is True)
451             If usemsgpack is True, the byte array stored by SDL is deserialized
452             using msgpack to yield the original value that was stored.
453             If usemsgpack is False, the byte array stored by SDL is returned
454             without further processing.
455
456         Returns
457         -------
458         Dictionary of key-value pairs
459             Each key has the specified prefix.
460             The value object (its type) depends on the usemsgpack parameter,
461             but is either a Python object or raw bytes as discussed above.
462             Answers an empty dictionary if no keys matched the prefix.
463         """
464         return self.sdl.find_and_get(namespace, prefix, usemsgpack)
465
466     def sdl_delete(self, namespace, key):
467         """
468         ** Deprecate Warning **
469         ** Will be removed in a future function **
470
471         Deletes the key-value pair with the specified key in the specified namespace.
472
473         Parameters
474         ----------
475         namespace: string
476            SDL namespace
477         key: string
478             SDL key
479         """
480         self.sdl.delete(namespace, key)
481
482     def _get_rnib_info(self, node_type):
483         """
484         Since the difference between get_list_gnb_ids and get_list_enb_ids is only note-type,
485         this function extracted from the duplicated logic.
486
487         Parameters
488         ----------
489         node_type: string
490            Type of node. This is EnumDescriptor.
491            Available node types
492            - UNKNOWN
493            - ENG
494            - GNB
495
496         Returns
497         -------
498             List: (NbIdentity)
499
500         Raises
501         -------
502             SdlTypeError: If function's argument is of an inappropriate type.
503             NotConnected: If SDL is not connected to the backend data storage.
504             RejectedByBackend: If backend data storage rejects the request.
505             BackendError: If the backend data storage fails to process the request.
506         """
507         nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
508         ret: List[NbIdentity] = []
509         for nbid_string in nbid_strings:
510             nbid = NbIdentity()
511             nbid.ParseFromString(nbid_string)
512             ret.append(nbid)
513         return ret
514
515     def get_list_gnb_ids(self):
516         """
517         Retrieves the list of gNodeb identity entities
518
519         gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
520         is stored in SDL's `e2Manager` namespace as protobuf serialized.
521
522         Returns
523         -------
524             List: (NbIdentity)
525
526         Raises
527         -------
528             SdlTypeError: If function's argument is of an inappropriate type.
529             NotConnected: If SDL is not connected to the backend data storage.
530             RejectedByBackend: If backend data storage rejects the request.
531             BackendError: If the backend data storage fails to process the request.
532         """
533         return self._get_rnib_info(Node.Type.Name(Node.Type.GNB))
534
535     def get_list_enb_ids(self):
536         """
537         Retrieves the list of eNodeb identity entities
538
539         eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
540         is stored in SDL's `e2Manager` namespace as protobuf serialized.
541
542         Returns
543         -------
544             List: (NbIdentity)
545
546         Raises
547         ------
548             SdlTypeError: If function's argument is of an inappropriate type.
549             NotConnected: If SDL is not connected to the backend data storage.
550             RejectedByBackend: If backend data storage rejects the request.
551             BackendError: If the backend data storage fails to process the request.
552         """
553         return self._get_rnib_info(Node.Type.Name(Node.Type.ENB))
554
555     # Health
556
557     def healthcheck(self):
558         """
559         this needs to be understood how this is supposed to work
560         """
561         return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
562
563     # Convenience function for discovering config change events
564
565     def config_check(self, timeout=0):
566         """
567         Checks the watcher for configuration-file events. The watcher
568         prerequisites and event mask are documented in __init__().
569
570         Parameters
571         ----------
572         timeout: int (optional)
573             Number of seconds to wait for a configuration-file event, default 0.
574
575         Returns
576         -------
577         List of Events, possibly empty
578             An event is a tuple with objects wd, mask, cookie and name.
579             For example::
580
581                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
582
583         """
584         if not self._inotify:
585             return []
586         events = self._inotify.read(timeout=timeout)
587         return list(events)
588
589     def stop(self):
590         """
591         cleans up and stops the xapp rmr thread (currently). This is
592         critical for unit testing as pytest will never return if the
593         thread is running.
594
595         TODO: can we register a ctrl-c handler so this gets called on
596         ctrl-c? Because currently two ctrl-c are needed to stop.
597         """
598
599         self.xapp_shutdown()
600
601         self._rmr_loop.stop()
602
603
604 # Public classes that Xapp writers should instantiate or subclass
605 # to implement an Xapp.
606
607
608 class RMRXapp(_BaseXapp):
609     """
610     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
611     only performs an action when a message is received.  Clients should
612     invoke the run method, which has a loop that waits for RMR messages
613     and calls the appropriate client-registered consume callback on each.
614
615     If environment variable CONFIG_FILE is defined, and that variable
616     contains a path to an existing file, this class polls a watcher
617     defined on that file to detect file-write events, and invokes a
618     configuration-change handler on each event. The handler is also
619     invoked at startup.  If no handler function is supplied to the
620     constructor, this class defines a default handler that only logs a
621     message.
622
623     Parameters
624     ----------
625     default_handler: function
626         A function with the signature (summary, sbuf) to be called when a
627         message type is received for which no other handler is registered.
628     default_handler argument summary: dict
629         The RMR message summary, a dict of key-value pairs
630     default_handler argument sbuf: ctypes c_void_p
631         Pointer to an RMR message buffer. The user must call free on this when done.
632     config_handler: function (optional, default is documented above)
633         A function with the signature (json) to be called at startup and each time
634         a configuration-file change event is detected. The JSON object is read from
635         the configuration file, if the prerequisites are met.
636     config_handler argument json: dict
637         The contents of the configuration file, parsed as JSON.
638     rmr_port: integer (optional, default is 4562)
639         Initialize RMR to listen on this port
640     rmr_wait_for_ready: boolean (optional, default is True)
641         Wait for RMR to signal ready before starting the dispatch loop
642     use_fake_sdl: boolean (optional, default is False)
643         Use an in-memory store instead of the real SDL service
644     post_init: function (optional, default None)
645         Run this function after the app initializes and before the dispatch loop starts;
646         its signature should be post_init(self)
647     """
648
649     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
650                  post_init=None):
651         """
652         Also see _BaseXapp
653         """
654         # init base
655         super().__init__(
656             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
657         )
658
659         # setup callbacks
660         self._default_handler = default_handler
661         self._config_handler = config_handler
662         self._dispatch = {}
663
664         # used for thread control
665         self._keep_going = True
666
667         # register a default healthcheck handler
668         # this default checks that rmr is working and SDL is working
669         # the user can override this and register their own handler
670         # if they wish since the "last registered callback wins".
671         def handle_healthcheck(self, summary, sbuf):
672             healthy = self.healthcheck()
673             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
674             self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
675             self.rmr_free(sbuf)
676
677         self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
678
679         # define a default configuration-change handler if none was provided.
680         if not config_handler:
681             def handle_config_change(self, config):
682                 self.logger.debug("xapp_frame: default config handler invoked")
683
684             self._config_handler = handle_config_change
685
686         # call the config handler at startup if prereqs were met
687         if self._inotify:
688             with open(self._config_path) as json_file:
689                 data = json.load(json_file)
690             self.logger.debug("run: invoking config handler at start")
691             self._config_handler(self, data)
692
693     def register_callback(self, handler, message_type):
694         """
695         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
696
697         Parameters
698         ----------
699         handler: function
700             a function with the signature (summary, sbuf) to be called
701             when a message of type message_type is received
702         summary: dict
703             the rmr message summary
704         sbuf: ctypes c_void_p
705             Pointer to an rmr message buffer. The user must call free on this when done.
706
707         message:type: int
708             the message type to look for
709
710         Note if this method is called multiple times for a single message type, the "last one wins".
711         """
712         self._dispatch[message_type] = handler
713
714     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
715         """
716         This function should be called when the reactive Xapp is ready to start.
717         After start, the Xapp's handlers will be called on received messages.
718
719         Parameters
720         ----------
721         thread: bool (optional, default is False)
722             If False, execution is not returned and the framework loops forever.
723             If True, a thread is started to run the queue read/dispatch loop
724             and execution is returned to caller; the thread can be stopped
725             by calling the .stop() method.
726
727         rmr_timeout: integer (optional, default is 5 seconds)
728             Length of time to wait for an RMR message to arrive.
729
730         inotify_timeout: integer (optional, default is 0 seconds)
731             Length of time to wait for an inotify event to arrive.
732         """
733
734         def loop():
735             while self._keep_going:
736
737                 # poll RMR
738                 try:
739                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
740                     # dispatch
741                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
742                     if not func:
743                         func = self._default_handler
744                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
745                     func(self, summary, sbuf)
746                 except queue.Empty:
747                     # the get timed out
748                     pass
749
750                 # poll configuration file watcher
751                 try:
752                     events = self.config_check(timeout=inotify_timeout)
753                     for event in events:
754                         with open(self._config_path) as json_file:
755                             data = json.load(json_file)
756                         self.logger.debug("run: invoking config handler on change event {}".format(event))
757                         self._config_handler(self, data)
758                 except Exception as error:
759                     self.logger.error("run: configuration handler failed: {}".format(error))
760
761         if thread:
762             Thread(target=loop).start()
763         else:
764             loop()
765
766     def stop(self):
767         """
768         Sets the flag to end the dispatch loop.
769         """
770         super().stop()
771         self.logger.debug("Setting flag to end framework work loop.")
772         self._keep_going = False
773
774
775 class Xapp(_BaseXapp):
776     """
777     Represents a generic Xapp where the client provides a single function
778     for the framework to call at startup time (instead of providing callback
779     functions by message type). The Xapp writer must implement and provide a
780     function with a loop-forever construct similar to the `run` function in
781     the `RMRXapp` class.  That function should poll to retrieve RMR messages
782     and dispatch them appropriately, poll for configuration changes, etc.
783
784     Parameters
785     ----------
786     entrypoint: function
787         This function is called when the Xapp class's run method is invoked.
788         The function signature must be just function(self)
789     rmr_port: integer (optional, default is 4562)
790         Initialize RMR to listen on this port
791     rmr_wait_for_ready: boolean (optional, default is True)
792         Wait for RMR to signal ready before starting the dispatch loop
793     use_fake_sdl: boolean (optional, default is False)
794         Use an in-memory store instead of the real SDL service
795     """
796
797     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
798         """
799         Parameters
800         ----------
801
802         For the other parameters, see class _BaseXapp.
803         """
804         # init base
805         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
806         self._entrypoint = entrypoint
807
808     def run(self):
809         """
810         This function should be called when the general Xapp is ready to start.
811         """
812         self._entrypoint(self)
813
814     # there is no need for stop currently here (base has, and nothing
815     # special to do here)