Upadated Xappframework for Xapp registration/deregistration to RIC
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 import time
26 from threading import Thread
27 import inotify_simple
28 from mdclogpy import Logger
29 from ricxappframe import xapp_rmr
30 from ricxappframe.rmr import rmr
31 from ricxappframe.xapp_sdl import SDLWrapper
32 import requests
33 # message-type constants
34 RIC_HEALTH_CHECK_REQ = 100
35 RIC_HEALTH_CHECK_RESP = 101
36
37 # environment variable with path to configuration file
38 CONFIG_FILE_ENV = "CONFIG_FILE"
39 CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
40
41
42 class _BaseXapp:
43     """
44     This class initializes RMR, starts a thread that checks for incoming
45     messages, provisions an SDL object and optionally creates a
46     config-file watcher.  This private base class should not be
47     instantiated by clients directly, but it defines many public methods
48     that may be used by clients.
49
50     If environment variable CONFIG_FILE is defined, and that variable
51     contains a path to an existing file, a watcher is defined to monitor
52     modifications (writes) to that file using the Linux kernel's inotify
53     feature. The watcher must be polled by calling method
54     config_check().
55
56     Parameters
57     ----------
58     rmr_port: int (optional, default is 4562)
59         Port on which the RMR library listens for incoming messages.
60
61     rmr_wait_for_ready: bool (optional, default is True)
62         If this is True, then init waits until RMR is ready to send,
63         which includes having a valid routing file. This can be set
64         to False if the client wants to *receive only*.
65
66     use_fake_sdl: bool (optional, default is False)
67         if this is True, it uses the DBaaS "fake dict backend" instead
68         of Redis or other backends. Set this to True when developing
69         an xapp or during unit testing to eliminate the need for DBaaS.
70
71     post_init: function (optional, default is None)
72         Runs this user-provided function at the end of the init method;
73         its signature should be post_init(self)
74     """
75
76     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
77         """
78         Documented in the class comment.
79         """
80         # PUBLIC, can be used by xapps using self.(name):
81         self.logger = Logger(name=__name__)
82
83         # Start rmr rcv thread
84         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
85         self._mrc = self._rmr_loop.mrc  # for convenience
86
87         # SDL
88         self.sdl = SDLWrapper(use_fake_sdl)
89
90         # Config
91         # The environment variable specifies the path to the Xapp config file
92         self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
93         if self._config_path and os.path.isfile(self._config_path):
94             self._inotify = inotify_simple.INotify()
95             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
96             self.logger.debug("__init__: watching config file {}".format(self._config_path))
97         else:
98             self._inotify = None
99             self.logger.warning("__init__: NOT watching any config file")
100
101         # used for thread control of Registration of Xapp
102         self._keep_registration = True
103
104         # configuration data  for xapp registration and deregistration
105         self._config_data = None
106         self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
107         if self._configfile_path and os.path.isfile(self._configfile_path):
108             with open(self._configfile_path) as json_file:
109                 self._config_data = json.load(json_file)
110         else:
111             self._keep_registration = False
112             self.logger.warning("__init__: Cannot Read config file for xapp Registration")
113
114         Thread(target=self.registerXapp).start()
115
116         # run the optionally provided user post init
117         if post_init:
118             post_init(self)
119
120     def get_service(self, host, service):
121         """
122         To find the url for connecting to the service
123
124         Parameters
125         ----------
126         host: string
127             defines the hostname in the url
128         service: string
129             defines the servicename in the url
130
131         Returns
132         -------
133         string
134             url for the service
135         """
136         app_namespace = self._config_data.get("APP_NAMESPACE")
137         if app_namespace == "":
138             app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
139         svc = service.format(app_namespace.upper(), host.upper())
140         url = svc.replace("-", "_").split("//")
141
142         if len(url) > 1:
143             return url[1]
144         return ""
145
146     def do_post(self, plt_namespace, url, msg):
147         """
148         registration of the xapp using the url and json msg
149
150         Parameters
151         ----------
152         plt_namespace: string
153             platform namespace where the xapp is running
154         url: string
155             url for xapp registration
156         msg: string
157             json msg containing the xapp details
158
159         Returns
160         -------
161         bool
162             whether or not the xapp is registered
163         """
164         try:
165             request_url = url.format(plt_namespace, plt_namespace)
166             resp = requests.post(request_url, json=msg)
167             self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
168             return resp.status_code == 200 or resp.status_code == 201
169         except requests.exceptions.RequestException as err:
170             self.logger.error("Error : {}".format(err))
171             return format(err)
172         except requests.exceptions.HTTPError as errh:
173             self.logger.error("Http Error: {}".format(errh))
174             return errh
175         except requests.exceptions.ConnectionError as errc:
176             self.logger.error("Error Connecting: {}".format(errc))
177             return errc
178         except requests.exceptions.Timeout as errt:
179             self.logger.error("Timeout Error: {}".format(errt))
180             return errt
181
182     def register(self):
183         """
184             function to registers the xapp
185
186         Returns
187         -------
188         bool
189             whether or not the xapp is registered
190         """
191         hostname = self._config_data.get("hostname")
192         xappname = self._config_data.get("name")
193         xappversion = self._config_data.get("version")
194         pltnamespace = self._config_data.get("PLT_NAMESPACE")
195         if pltnamespace == "":
196             pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
197         http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
198         rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
199         if http_endpoint == "" or rmr_endpoint == "":
200             self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
201             return None
202         try:
203             request_string = {
204                 "appName": hostname,
205                 "httpEndpoint": http_endpoint,
206                 "rmrEndpoint": rmr_endpoint,
207                 "appInstanceName": xappname,
208                 "appVersion": xappversion,
209                 "configPath": self._config_data.get("CONFIG_PATH")
210             }
211             request_body = json.dumps(request_string)
212         except TypeError:
213             self.logger.error("Unable to serialize the object")
214             return "Error searializing the object"
215
216         return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
217
218     def registerXapp(self):
219         """
220             registers the xapp
221         """
222         while self._keep_registration:
223             time.sleep(5)
224             # checking for rmr/sdl/xapp health
225             healthy = self.healthcheck()
226             if not healthy:
227                 self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
228                 continue
229
230             self.logger.debug("Application='{}'  is now up and ready, continue with registration ...".format(self._config_data.get("name")))
231             self.register()
232             self.logger.debug("Registration done, proceeding with startup ...")
233             break
234
235     def deregister(self):
236         """
237             Deregisters the xapp
238
239         Returns
240         -------
241         bool
242             whether or not the xapp is registered
243         """
244         healthy = self.healthcheck()
245         if not healthy:
246             self.logger.error("RMR or SDL or xapp == Not Healthy")
247             return None
248         if self._config_data is None:
249             return None
250         name = self._config_data.get("hostname")
251         xappname = self._config_data.get("name")
252         pltnamespace = self._config_data.get("PLT_NAMESPACE")
253         if pltnamespace == "":
254             pltnamespace = self._config_data.get("PLT_NAMESPACE")
255         try:
256             request_string = {
257                 "appName": name,
258                 "appInstanceName": xappname,
259             }
260             request_body = json.dumps(request_string)
261         except TypeError:
262             self.logger.error("Error Serializing the object")
263             return "Error serializing the object"
264
265         return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
266
267     def xapp_shutdown(self):
268         """
269              Deregisters the xapp while shutting down
270         """
271         self.deregister()
272         self.logger.debug("Wait for xapp to get unregistered")
273         time.sleep(10)
274
275     # Public rmr methods
276
277     def rmr_get_messages(self):
278         """
279         Returns a generator iterable over all items in the queue that
280         have not yet been read by the client xapp. Each item is a tuple
281         (S, sbuf) where S is a message summary dict and sbuf is the raw
282         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
283         finished with each sbuf to prevent memory leaks!
284         """
285         while not self._rmr_loop.rcv_queue.empty():
286             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
287             yield (summary, sbuf)
288
289     def rmr_send(self, payload, mtype, retries=100):
290         """
291         Allocates a buffer, sets payload and mtype, and sends
292
293         Parameters
294         ----------
295         payload: bytes
296             payload to set
297         mtype: int
298             message type
299         retries: int (optional)
300             Number of times to retry at the application level before excepting RMRFailure
301
302         Returns
303         -------
304         bool
305             whether or not the send worked after retries attempts
306         """
307         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
308
309         for _ in range(retries):
310             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
311             if sbuf.contents.state == 0:
312                 self.rmr_free(sbuf)
313                 return True
314
315         self.rmr_free(sbuf)
316         return False
317
318     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
319         """
320         Allows the xapp to return to sender, possibly adjusting the
321         payload and message type before doing so.  This does NOT free
322         the sbuf for the caller as the caller may wish to perform
323         multiple rts per buffer. The client needs to free.
324
325         Parameters
326         ----------
327         sbuf: ctypes c_void_p
328              Pointer to an rmr message buffer
329         new_payload: bytes (optional)
330             New payload to set
331         new_mtype: int (optional)
332             New message type (replaces the received message)
333         retries: int (optional, default 100)
334             Number of times to retry at the application level
335
336         Returns
337         -------
338         bool
339             whether or not the send worked after retries attempts
340         """
341         for _ in range(retries):
342             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
343             if sbuf.contents.state == 0:
344                 return True
345
346         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
347         return False
348
349     def rmr_free(self, sbuf):
350         """
351         Frees an rmr message buffer after use
352
353         Note: this does not need to be a class method, self is not
354         used. However if we break it out as a function we need a home
355         for it.
356
357         Parameters
358         ----------
359         sbuf: ctypes c_void_p
360              Pointer to an rmr message buffer
361         """
362         rmr.rmr_free_msg(sbuf)
363
364     # Convenience (pass-thru) function for invoking SDL.
365
366     def sdl_set(self, namespace, key, value, usemsgpack=True):
367         """
368         ** Deprecate Warning **
369         ** Will be removed in a future function **
370
371         Stores a key-value pair to SDL, optionally serializing the value
372         to bytes using msgpack.
373
374         Parameters
375         ----------
376         namespace: string
377             SDL namespace
378         key: string
379             SDL key
380         value:
381             Object or byte array to store.  See the `usemsgpack` parameter.
382         usemsgpack: boolean (optional, default is True)
383             Determines whether the value is serialized using msgpack before storing.
384             If usemsgpack is True, the msgpack function `packb` is invoked
385             on the value to yield a byte array that is then sent to SDL.
386             Stated differently, if usemsgpack is True, the value can be anything
387             that is serializable by msgpack.
388             If usemsgpack is False, the value must be bytes.
389         """
390         self.sdl.set(namespace, key, value, usemsgpack)
391
392     def sdl_get(self, namespace, key, usemsgpack=True):
393         """
394         ** Deprecate Warning **
395         ** Will be removed in a future function **
396
397         Gets the value for the specified namespace and key from SDL,
398         optionally deserializing stored bytes using msgpack.
399
400         Parameters
401         ----------
402         namespace: string
403             SDL namespace
404         key: string
405             SDL key
406         usemsgpack: boolean (optional, default is True)
407             If usemsgpack is True, the byte array stored by SDL is deserialized
408             using msgpack to yield the original object that was stored.
409             If usemsgpack is False, the byte array stored by SDL is returned
410             without further processing.
411
412         Returns
413         -------
414         Value
415             See the usemsgpack parameter for an explanation of the returned value type.
416             Answers None if the key is not found.
417         """
418         return self.sdl.get(namespace, key, usemsgpack)
419
420     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
421         """
422         ** Deprecate Warning **
423         ** Will be removed in a future function **
424
425         Gets all key-value pairs in the specified namespace
426         with keys that start with the specified prefix,
427         optionally deserializing stored bytes using msgpack.
428
429         Parameters
430         ----------
431         nnamespaces: string
432            SDL namespace
433         prefix: string
434             the key prefix
435         usemsgpack: boolean (optional, default is True)
436             If usemsgpack is True, the byte array stored by SDL is deserialized
437             using msgpack to yield the original value that was stored.
438             If usemsgpack is False, the byte array stored by SDL is returned
439             without further processing.
440
441         Returns
442         -------
443         Dictionary of key-value pairs
444             Each key has the specified prefix.
445             The value object (its type) depends on the usemsgpack parameter,
446             but is either a Python object or raw bytes as discussed above.
447             Answers an empty dictionary if no keys matched the prefix.
448         """
449         return self.sdl.find_and_get(namespace, prefix, usemsgpack)
450
451     def sdl_delete(self, namespace, key):
452         """
453         ** Deprecate Warning **
454         ** Will be removed in a future function **
455
456         Deletes the key-value pair with the specified key in the specified namespace.
457
458         Parameters
459         ----------
460         namespace: string
461            SDL namespace
462         key: string
463             SDL key
464         """
465         self.sdl.delete(namespace, key)
466
467     # Health
468
469     def healthcheck(self):
470         """
471         this needs to be understood how this is supposed to work
472         """
473         return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
474
475     # Convenience function for discovering config change events
476
477     def config_check(self, timeout=0):
478         """
479         Checks the watcher for configuration-file events. The watcher
480         prerequisites and event mask are documented in __init__().
481
482         Parameters
483         ----------
484         timeout: int (optional)
485             Number of seconds to wait for a configuration-file event, default 0.
486
487         Returns
488         -------
489         List of Events, possibly empty
490             An event is a tuple with objects wd, mask, cookie and name.
491             For example::
492
493                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
494
495         """
496         if not self._inotify:
497             return []
498         events = self._inotify.read(timeout=timeout)
499         return list(events)
500
501     def stop(self):
502         """
503         cleans up and stops the xapp rmr thread (currently). This is
504         critical for unit testing as pytest will never return if the
505         thread is running.
506
507         TODO: can we register a ctrl-c handler so this gets called on
508         ctrl-c? Because currently two ctrl-c are needed to stop.
509         """
510
511         self.xapp_shutdown()
512
513         self._rmr_loop.stop()
514
515
516 # Public classes that Xapp writers should instantiate or subclass
517 # to implement an Xapp.
518
519
520 class RMRXapp(_BaseXapp):
521     """
522     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
523     only performs an action when a message is received.  Clients should
524     invoke the run method, which has a loop that waits for RMR messages
525     and calls the appropriate client-registered consume callback on each.
526
527     If environment variable CONFIG_FILE is defined, and that variable
528     contains a path to an existing file, this class polls a watcher
529     defined on that file to detect file-write events, and invokes a
530     configuration-change handler on each event. The handler is also
531     invoked at startup.  If no handler function is supplied to the
532     constructor, this class defines a default handler that only logs a
533     message.
534
535     Parameters
536     ----------
537     default_handler: function
538         A function with the signature (summary, sbuf) to be called when a
539         message type is received for which no other handler is registered.
540     default_handler argument summary: dict
541         The RMR message summary, a dict of key-value pairs
542     default_handler argument sbuf: ctypes c_void_p
543         Pointer to an RMR message buffer. The user must call free on this when done.
544     config_handler: function (optional, default is documented above)
545         A function with the signature (json) to be called at startup and each time
546         a configuration-file change event is detected. The JSON object is read from
547         the configuration file, if the prerequisites are met.
548     config_handler argument json: dict
549         The contents of the configuration file, parsed as JSON.
550     rmr_port: integer (optional, default is 4562)
551         Initialize RMR to listen on this port
552     rmr_wait_for_ready: boolean (optional, default is True)
553         Wait for RMR to signal ready before starting the dispatch loop
554     use_fake_sdl: boolean (optional, default is False)
555         Use an in-memory store instead of the real SDL service
556     post_init: function (optional, default None)
557         Run this function after the app initializes and before the dispatch loop starts;
558         its signature should be post_init(self)
559     """
560
561     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
562         """
563         Also see _BaseXapp
564         """
565         # init base
566         super().__init__(
567             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
568         )
569
570         # setup callbacks
571         self._default_handler = default_handler
572         self._config_handler = config_handler
573         self._dispatch = {}
574
575         # used for thread control
576         self._keep_going = True
577
578         # register a default healthcheck handler
579         # this default checks that rmr is working and SDL is working
580         # the user can override this and register their own handler
581         # if they wish since the "last registered callback wins".
582         def handle_healthcheck(self, summary, sbuf):
583             healthy = self.healthcheck()
584             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
585             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
586             self.rmr_free(sbuf)
587
588         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
589
590         # define a default configuration-change handler if none was provided.
591         if not config_handler:
592             def handle_config_change(self, config):
593                 self.logger.debug("xapp_frame: default config handler invoked")
594             self._config_handler = handle_config_change
595
596         # call the config handler at startup if prereqs were met
597         if self._inotify:
598             with open(self._config_path) as json_file:
599                 data = json.load(json_file)
600             self.logger.debug("run: invoking config handler at start")
601             self._config_handler(self, data)
602
603     def register_callback(self, handler, message_type):
604         """
605         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
606
607         Parameters
608         ----------
609         handler: function
610             a function with the signature (summary, sbuf) to be called
611             when a message of type message_type is received
612         summary: dict
613             the rmr message summary
614         sbuf: ctypes c_void_p
615             Pointer to an rmr message buffer. The user must call free on this when done.
616
617         message:type: int
618             the message type to look for
619
620         Note if this method is called multiple times for a single message type, the "last one wins".
621         """
622         self._dispatch[message_type] = handler
623
624     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
625         """
626         This function should be called when the reactive Xapp is ready to start.
627         After start, the Xapp's handlers will be called on received messages.
628
629         Parameters
630         ----------
631         thread: bool (optional, default is False)
632             If False, execution is not returned and the framework loops forever.
633             If True, a thread is started to run the queue read/dispatch loop
634             and execution is returned to caller; the thread can be stopped
635             by calling the .stop() method.
636
637         rmr_timeout: integer (optional, default is 5 seconds)
638             Length of time to wait for an RMR message to arrive.
639
640         inotify_timeout: integer (optional, default is 0 seconds)
641             Length of time to wait for an inotify event to arrive.
642         """
643
644         def loop():
645             while self._keep_going:
646
647                 # poll RMR
648                 try:
649                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
650                     # dispatch
651                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
652                     if not func:
653                         func = self._default_handler
654                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
655                     func(self, summary, sbuf)
656                 except queue.Empty:
657                     # the get timed out
658                     pass
659
660                 # poll configuration file watcher
661                 try:
662                     events = self.config_check(timeout=inotify_timeout)
663                     for event in events:
664                         with open(self._config_path) as json_file:
665                             data = json.load(json_file)
666                         self.logger.debug("run: invoking config handler on change event {}".format(event))
667                         self._config_handler(self, data)
668                 except Exception as error:
669                     self.logger.error("run: configuration handler failed: {}".format(error))
670
671         if thread:
672             Thread(target=loop).start()
673         else:
674             loop()
675
676     def stop(self):
677         """
678         Sets the flag to end the dispatch loop.
679         """
680         super().stop()
681         self.logger.debug("Setting flag to end framework work loop.")
682         self._keep_going = False
683
684
685 class Xapp(_BaseXapp):
686     """
687     Represents a generic Xapp where the client provides a single function
688     for the framework to call at startup time (instead of providing callback
689     functions by message type). The Xapp writer must implement and provide a
690     function with a loop-forever construct similar to the `run` function in
691     the `RMRXapp` class.  That function should poll to retrieve RMR messages
692     and dispatch them appropriately, poll for configuration changes, etc.
693
694     Parameters
695     ----------
696     entrypoint: function
697         This function is called when the Xapp class's run method is invoked.
698         The function signature must be just function(self)
699     rmr_port: integer (optional, default is 4562)
700         Initialize RMR to listen on this port
701     rmr_wait_for_ready: boolean (optional, default is True)
702         Wait for RMR to signal ready before starting the dispatch loop
703     use_fake_sdl: boolean (optional, default is False)
704         Use an in-memory store instead of the real SDL service
705     """
706
707     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
708         """
709         Parameters
710         ----------
711
712         For the other parameters, see class _BaseXapp.
713         """
714         # init base
715         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
716         self._entrypoint = entrypoint
717
718     def run(self):
719         """
720         This function should be called when the general Xapp is ready to start.
721         """
722         self._entrypoint(self)
723
724     # there is no need for stop currently here (base has, and nothing
725     # special to do here)