RIC-642 related changes: REST subscription, rnib enhancements, symptomdata, rest...
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 import time
26 from threading import Thread
27 from typing import List, Set
28
29 import inotify_simple
30 from mdclogpy import Logger
31
32 from ricxappframe import xapp_rmr
33 from ricxappframe.constants import sdl_namespaces
34
35 import ricxappframe.entities.rnib.nodeb_info_pb2 as pb_nbi
36 import ricxappframe.entities.rnib.cell_pb2 as pb_cell
37 from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
38 from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
39
40 from ricxappframe.rmr import rmr
41 from ricxappframe.util.constants import Constants
42 from ricxappframe.xapp_sdl import SDLWrapper
43 import requests
44
45
46 class _BaseXapp:
47     """
48     This class initializes RMR, starts a thread that checks for incoming
49     messages, provisions an SDL object and optionally creates a
50     config-file watcher.  This private base class should not be
51     instantiated by clients directly, but it defines many public methods
52     that may be used by clients.
53
54     If environment variable CONFIG_FILE is defined, and that variable
55     contains a path to an existing file, a watcher is defined to monitor
56     modifications (writes) to that file using the Linux kernel's inotify
57     feature. The watcher must be polled by calling method
58     config_check().
59
60     Parameters
61     ----------
62     rmr_port: int (optional, default is 4562)
63         Port on which the RMR library listens for incoming messages.
64
65     rmr_wait_for_ready: bool (optional, default is True)
66         If this is True, then init waits until RMR is ready to send,
67         which includes having a valid routing file. This can be set
68         to False if the client wants to *receive only*.
69
70     use_fake_sdl: bool (optional, default is False)
71         if this is True, it uses the DBaaS "fake dict backend" instead
72         of Redis or other backends. Set this to True when developing
73         an xapp or during unit testing to eliminate the need for DBaaS.
74
75     post_init: function (optional, default is None)
76         Runs this user-provided function at the end of the init method;
77         its signature should be post_init(self)
78     """
79
80     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
81         """
82         Documented in the class comment.
83         """
84         # PUBLIC, can be used by xapps using self.(name):
85         self.logger = Logger(name=__name__)
86         self._appthread = None
87
88         # Start rmr rcv thread
89         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
90         self._mrc = self._rmr_loop.mrc  # for convenience
91
92         # SDL
93         self.sdl = SDLWrapper(use_fake_sdl)
94
95         # Config
96         # The environment variable specifies the path to the Xapp config file
97         self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
98         if self._config_path and os.path.isfile(self._config_path):
99             self._inotify = inotify_simple.INotify()
100             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
101             self.logger.debug("__init__: watching config file {}".format(self._config_path))
102         else:
103             self._inotify = None
104             self.logger.warning("__init__: NOT watching any config file")
105
106         # used for thread control of Registration of Xapp
107         self._keep_registration = True
108
109         # configuration data  for xapp registration and deregistration
110         self._config_data = None
111         if self._config_path and os.path.isfile(self._config_path):
112             with open(self._config_path) as json_file:
113                 self._config_data = json.load(json_file)
114         else:
115             self._keep_registration = False
116             self.logger.error("__init__: Cannot Read config file for xapp Registration")
117             self._config_data = {}
118
119         self._appthread = Thread(target=self.registerXapp).start()
120
121         # run the optionally provided user post init
122         if post_init:
123             post_init(self)
124
125     def get_service(self, host, service):
126         """
127         To find the url for connecting to the service
128
129         Parameters
130         ----------
131         host: string
132             defines the hostname in the url
133         service: string
134             defines the servicename in the url
135
136         Returns
137         -------
138         string
139             url for the service
140         """
141         app_namespace = self._config_data.get("APP_NAMESPACE")
142         if app_namespace is None:
143             app_namespace = Constants.DEFAULT_XAPP_NS
144         self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
145         if app_namespace is not None and host is not None:
146             svc = service.format(app_namespace.upper(), host.upper())
147             urlkey = svc.replace("-", "_")
148             url = os.environ.get(urlkey).split("//")
149             self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
150             if len(url) > 1:
151                 return url[1]
152         return ""
153
154     def do_post(self, plt_namespace, url, msg):
155         """
156         registration of the xapp using the url and json msg
157
158         Parameters
159         ----------
160         plt_namespace: string
161             platform namespace where the xapp is running
162         url: string
163             url for xapp registration
164         msg: string
165             json msg containing the xapp details
166
167         Returns
168         -------
169         bool
170             whether or not the xapp is registered
171         """
172         if url is None:
173             self.logger.error("url is empty ")
174             return False
175         if plt_namespace is None:
176             self.logger.error("plt_namespace is empty")
177             return False
178         try:
179             request_url = url.format(plt_namespace, plt_namespace)
180             resp = requests.post(request_url, json=msg)
181             self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
182             self.logger.debug("Response Text : {}".format(resp.text))
183             return resp.status_code == 200 or resp.status_code == 201
184         except requests.exceptions.RequestException as err:
185             self.logger.error("Error : {}".format(err))
186             return format(err)
187         except requests.exceptions.HTTPError as errh:
188             self.logger.error("Http Error: {}".format(errh))
189             return errh
190         except requests.exceptions.ConnectionError as errc:
191             self.logger.error("Error Connecting: {}".format(errc))
192             return errc
193         except requests.exceptions.Timeout as errt:
194             self.logger.error("Timeout Error: {}".format(errt))
195             return errt
196
197     def register(self):
198         """
199             function to registers the xapp
200
201         Returns
202         -------
203         bool
204             whether or not the xapp is registered
205         """
206         hostname = os.environ.get("HOSTNAME")
207         xappname = self._config_data.get("name")
208         xappversion = self._config_data.get("version")
209         pltnamespace = os.environ.get("PLT_NAMESPACE")
210         if pltnamespace is None:
211             pltnamespace = Constants.DEFAULT_PLT_NS
212         self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
213             hostname, xappname, xappversion, pltnamespace))
214
215         http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
216         rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
217         if http_endpoint == "" or rmr_endpoint == "":
218             self.logger.error(
219                 "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
220                                                                                               rmr_endpoint))
221             return False
222         self.logger.debug(
223             "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
224             ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
225                                           self._config_data.get("CONFIG_PATH")))
226         request_string = {
227             "appName": hostname,
228             "appVersion": xappversion,
229             "configPath": "",
230             "appInstanceName": xappname,
231             "httpEndpoint": http_endpoint,
232             "rmrEndpoint": rmr_endpoint,
233             "config": json.dumps(self._config_data)
234         }
235         self.logger.info("REQUEST STRING :{}".format(request_string))
236         return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
237
238     def registerXapp(self):
239         """
240             registers the xapp
241         """
242         retries = 5
243         while self._keep_registration and retries > 0:
244             time.sleep(2)
245             retries = retries-1
246             # checking for rmr/sdl/xapp health
247             healthy = self.healthcheck()
248             if not healthy:
249                 self.logger.warning(
250                     "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
251                 continue
252
253             self.logger.debug("Application='{}'  is now up and ready, continue with registration ...".format(
254                 self._config_data.get("name")))
255             if self.register():
256                 self.logger.debug("Registration done, proceeding with startup ...")
257                 break
258
259     def deregister(self):
260         """
261             Deregisters the xapp
262
263         Returns
264         -------
265         bool
266             whether or not the xapp is registered
267         """
268         healthy = self.healthcheck()
269         if not healthy:
270             self.logger.error("RMR or SDL or xapp == Not Healthy")
271             return None
272         if self._config_data is None:
273             return None
274         name = os.environ.get("HOSTNAME")
275         xappname = self._config_data.get("name")
276         pltnamespace = os.environ.get("PLT_NAMESPACE")
277         if pltnamespace is None:
278             pltnamespace = Constants.DEFAULT_PLT_NS
279         request_string = {
280                 "appName": name,
281                 "appInstanceName": xappname,
282         }
283
284         return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
285
286     def xapp_shutdown(self):
287         """
288              Deregisters the xapp while shutting down
289         """
290         self.deregister()
291         self.logger.debug("Wait for xapp to get unregistered")
292         time.sleep(10)
293
294     # Public rmr methods
295
296     def rmr_get_messages(self):
297         """
298         Returns a generator iterable over all items in the queue that
299         have not yet been read by the client xapp. Each item is a tuple
300         (S, sbuf) where S is a message summary dict and sbuf is the raw
301         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
302         finished with each sbuf to prevent memory leaks!
303         """
304         while not self._rmr_loop.rcv_queue.empty():
305             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
306             yield (summary, sbuf)
307
308     def rmr_send(self, payload, mtype, retries=100):
309         """
310         Allocates a buffer, sets payload and mtype, and sends
311
312         Parameters
313         ----------
314         payload: bytes
315             payload to set
316         mtype: int
317             message type
318         retries: int (optional)
319             Number of times to retry at the application level before excepting RMRFailure
320
321         Returns
322         -------
323         bool
324             whether or not the send worked after retries attempts
325         """
326         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
327                                  mtype=mtype)
328
329         for _ in range(retries):
330             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
331             if sbuf.contents.state == 0:
332                 self.rmr_free(sbuf)
333                 return True
334
335         self.rmr_free(sbuf)
336         return False
337
338     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
339         """
340         Allows the xapp to return to sender, possibly adjusting the
341         payload and message type before doing so.  This does NOT free
342         the sbuf for the caller as the caller may wish to perform
343         multiple rts per buffer. The client needs to free.
344
345         Parameters
346         ----------
347         sbuf: ctypes c_void_p
348              Pointer to an rmr message buffer
349         new_payload: bytes (optional)
350             New payload to set
351         new_mtype: int (optional)
352             New message type (replaces the received message)
353         retries: int (optional, default 100)
354             Number of times to retry at the application level
355
356         Returns
357         -------
358         bool
359             whether or not the send worked after retries attempts
360         """
361         for _ in range(retries):
362             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
363             if sbuf.contents.state == 0:
364                 return True
365
366         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
367         return False
368
369     def rmr_free(self, sbuf):
370         """
371         Frees an rmr message buffer after use
372
373         Note: this does not need to be a class method, self is not
374         used. However if we break it out as a function we need a home
375         for it.
376
377         Parameters
378         ----------
379         sbuf: ctypes c_void_p
380              Pointer to an rmr message buffer
381         """
382         rmr.rmr_free_msg(sbuf)
383
384     # Convenience (pass-thru) function for invoking SDL.
385
386     def sdl_set(self, namespace, key, value, usemsgpack=True):
387         """
388         ** Deprecate Warning **
389         ** Will be removed in a future function **
390
391         Stores a key-value pair to SDL, optionally serializing the value
392         to bytes using msgpack.
393
394         Parameters
395         ----------
396         namespace: string
397             SDL namespace
398         key: string
399             SDL key
400         value:
401             Object or byte array to store.  See the `usemsgpack` parameter.
402         usemsgpack: boolean (optional, default is True)
403             Determines whether the value is serialized using msgpack before storing.
404             If usemsgpack is True, the msgpack function `packb` is invoked
405             on the value to yield a byte array that is then sent to SDL.
406             Stated differently, if usemsgpack is True, the value can be anything
407             that is serializable by msgpack.
408             If usemsgpack is False, the value must be bytes.
409         """
410         self.sdl.set(namespace, key, value, usemsgpack)
411
412     def sdl_get(self, namespace, key, usemsgpack=True):
413         """
414         ** Deprecate Warning **
415         ** Will be removed in a future function **
416
417         Gets the value for the specified namespace and key from SDL,
418         optionally deserializing stored bytes using msgpack.
419
420         Parameters
421         ----------
422         namespace: string
423             SDL namespace
424         key: string
425             SDL key
426         usemsgpack: boolean (optional, default is True)
427             If usemsgpack is True, the byte array stored by SDL is deserialized
428             using msgpack to yield the original object that was stored.
429             If usemsgpack is False, the byte array stored by SDL is returned
430             without further processing.
431
432         Returns
433         -------
434         Value
435             See the usemsgpack parameter for an explanation of the returned value type.
436             Answers None if the key is not found.
437         """
438         return self.sdl.get(namespace, key, usemsgpack)
439
440     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
441         """
442         ** Deprecate Warning **
443         ** Will be removed in a future function **
444
445         Gets all key-value pairs in the specified namespace
446         with keys that start with the specified prefix,
447         optionally deserializing stored bytes using msgpack.
448
449         Parameters
450         ----------
451         nnamespaces: string
452            SDL namespace
453         prefix: string
454             the key prefix
455         usemsgpack: boolean (optional, default is True)
456             If usemsgpack is True, the byte array stored by SDL is deserialized
457             using msgpack to yield the original value that was stored.
458             If usemsgpack is False, the byte array stored by SDL is returned
459             without further processing.
460
461         Returns
462         -------
463         Dictionary of key-value pairs
464             Each key has the specified prefix.
465             The value object (its type) depends on the usemsgpack parameter,
466             but is either a Python object or raw bytes as discussed above.
467             Answers an empty dictionary if no keys matched the prefix.
468         """
469         return self.sdl.find_and_get(namespace, prefix, usemsgpack)
470
471     def sdl_delete(self, namespace, key):
472         """
473         ** Deprecate Warning **
474         ** Will be removed in a future function **
475
476         Deletes the key-value pair with the specified key in the specified namespace.
477
478         Parameters
479         ----------
480         namespace: string
481            SDL namespace
482         key: string
483             SDL key
484         """
485         self.sdl.delete(namespace, key)
486
487     def _get_rnib_info(self, node_type):
488         """
489         Since the difference between get_list_gnb_ids and get_list_enb_ids is only node-type,
490         this function extracted from the duplicated logic.
491
492         Parameters
493         ----------
494         node_type: string
495            Type of node. This is EnumDescriptor.
496            Available node types
497            - UNKNOWN
498            - ENB
499            - GNB
500
501         Returns
502         -------
503             List: (NbIdentity)
504
505         Raises
506         ------
507             SdlTypeError: If function's argument is of an inappropriate type.
508             NotConnected: If SDL is not connected to the backend data storage.
509             RejectedByBackend: If backend data storage rejects the request.
510             BackendError: If the backend data storage fails to process the request.
511         """
512         nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
513         ret: List[NbIdentity] = []
514         for nbid_string in nbid_strings:
515             nbid = NbIdentity()
516             nbid.ParseFromString(nbid_string)
517             ret.append(nbid)
518         return ret
519
520     def get_list_gnb_ids(self):
521         """
522         Retrieves the list of gNodeb identity entities
523
524         gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
525         is stored in SDL's `e2Manager` namespace as protobuf serialized.
526
527         Returns
528         -------
529             List: (NbIdentity)
530
531         Raises
532         ------
533             SdlTypeError: If function's argument is of an inappropriate type.
534             NotConnected: If SDL is not connected to the backend data storage.
535             RejectedByBackend: If backend data storage rejects the request.
536             BackendError: If the backend data storage fails to process the request.
537         """
538         return self._get_rnib_info(Node.Type.Name(Node.GNB))
539
540     def get_list_enb_ids(self):
541         """
542         Retrieves the list of eNodeb identity entities
543
544         eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
545         is stored in SDL's `e2Manager` namespace as protobuf serialized.
546
547         Returns
548         -------
549             List: (NbIdentity)
550
551         Raises
552         ------
553             SdlTypeError: If function's argument is of an inappropriate type.
554             NotConnected: If SDL is not connected to the backend data storage.
555             RejectedByBackend: If backend data storage rejects the request.
556             BackendError: If the backend data storage fails to process the request.
557         """
558         return self._get_rnib_info(Node.Type.Name(Node.ENB))
559
560     """
561         Following RNIB methods are made to be inline of the go-lang based RNIB methods.
562         Method names are same as in repository:
563         gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/rnib
564     """
565     def GetNodeb(self, inventoryName):
566         """
567         Returns nodeb info
568         In RNIB SDL key is defined following way: RAN:<inventoryName>
569
570         Parameters
571         ----------
572         inventoryName: string
573
574         Returns
575         -------
576             NodebInfo()
577
578         Raises
579         ------
580             SdlTypeError: If function's argument is of an inappropriate type.
581             NotConnected: If SDL is not connected to the backend data storage.
582             RejectedByBackend: If backend data storage rejects the request.
583             BackendError: If the backend data storage fails to process the request.
584         """
585         nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'RAN:' + inventoryName, usemsgpack=False)
586         if nbid_string is not None:
587             nbinfo = pb_nbi.NodebInfo()
588             nbinfo.ParseFromString(nbid_string)
589             return nbinfo
590         return None
591
592     def GetNodebByGlobalNbId(self, nodeType, plmnId, nbId):
593         """
594         Returns nodeb identity based on type, plmn id and node id
595         In RNIB SDL key is defined following way: <nodeType>:<plmnId>:<nbId>
596
597         Parameters
598         ----------
599             nodeType: string
600             plmnId: string
601             nbId: string
602
603         Returns
604         -------
605             NbIdentity()
606
607         Raises
608         ------
609             SdlTypeError: If function's argument is of an inappropriate type.
610             NotConnected: If SDL is not connected to the backend data storage.
611             RejectedByBackend: If backend data storage rejects the request.
612             BackendError: If the backend data storage fails to process the request.
613         """
614         nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, nodeType + ':' + plmnId + ':' + nbId, usemsgpack=False)
615         if nbid_string is not None:
616             nbid = NbIdentity()
617             nbid.ParseFromString(nbid_string)
618             return nbid
619         return None
620
621     def GetCellList(self, inventoryName):
622         """
623         Returns nodeb served cell list from the saved node data
624         In RNIB SDL key is defined following way: RAN:<inventoryName>
625
626         Parameters
627         ----------
628             nodeType: string
629             plmnId: string
630             nbId: string
631
632         Returns
633         -------
634             ServedCellInfo() in case of ENB
635             ServedNRCell() in case of GNB
636
637         Raises
638         ------
639             SdlTypeError: If function's argument is of an inappropriate type.
640             NotConnected: If SDL is not connected to the backend data storage.
641             RejectedByBackend: If backend data storage rejects the request.
642             BackendError: If the backend data storage fails to process the request.
643         """
644         nodeb = self.GetNodeb(inventoryName)
645         if nodeb is not None:
646             if nodeb.HasField('enb'):
647                 return nodeb.enb.served_cells
648             elif nodeb.HasField('gnb'):
649                 return nodeb.gnb.served_nr_cells
650         return None
651
652     def GetCellById(self, cell_type, cell_id):
653         """
654         Returns cell info by cell type and id.
655         In RNIB SDL keys are defined based on the cell type:
656         ENB type CELL:<cell_id>
657         GNB type NRCELL:<cell_id>
658
659         Parameters
660         ----------
661         cell_type: string
662            Available cell types
663            - ENB
664            - GNB
665
666         Returns
667         -------
668             Cell()
669
670         Raises
671         ------
672             SdlTypeError: If function's argument is of an inappropriate type.
673             NotConnected: If SDL is not connected to the backend data storage.
674             RejectedByBackend: If backend data storage rejects the request.
675             BackendError: If the backend data storage fails to process the request.
676         """
677         cellstr = None
678         if cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.LTE_CELL):
679             cellstr = 'CELL'
680         elif cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.NR_CELL):
681             cellstr = 'NRCELL'
682         if cellstr is not None:
683             cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, cellstr + ':' + cell_id, usemsgpack=False)
684             if cell_string is not None:
685                 cell = pb_cell.Cell()
686                 cell.ParseFromString(cell_string)
687                 return cell
688         return None
689
690     def GetListNodebIds(self):
691         """
692         Returns both enb and gnb NbIdentity list
693
694         Returns
695         -------
696             List: (NbIdentity)
697
698         Raises
699         ------
700             SdlTypeError: If function's argument is of an inappropriate type.
701             NotConnected: If SDL is not connected to the backend data storage.
702             RejectedByBackend: If backend data storage rejects the request.
703             BackendError: If the backend data storage fails to process the request.
704         """
705         nlist1 = self._get_rnib_info(Node.Type.Name(Node.ENB))
706         nlist2 = self._get_rnib_info(Node.Type.Name(Node.GNB))
707
708         for n in nlist2:
709             nlist1.append(n)
710         return nlist1
711
712     def GetCell(self, inventoryName, pci):
713         """
714         Returns cell info using pci
715         In RNIB SDL key is defined following way: PCI:<inventoryName>:<pci hex val>
716
717         Parameters
718         ----------
719         inventoryName: string
720         pci: int
721
722         Returns
723         -------
724             Cell()
725
726         Raises
727         ------
728             SdlTypeError: If function's argument is of an inappropriate type.
729             NotConnected: If SDL is not connected to the backend data storage.
730             RejectedByBackend: If backend data storage rejects the request.
731             BackendError: If the backend data storage fails to process the request.
732         """
733         cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'PCI:{0:s}:{1:02x}'.format(inventoryName, pci), usemsgpack=False)
734         if cell_string is not None:
735             cell = pb_cell.Cell()
736             cell.ParseFromString(cell_string)
737             return cell
738         return None
739
740     def GetRanFunctionDefinition(self, inventoryName, ran_function_oid):
741         """
742         Returns GNB ran function definition list based on the ran_function_oid
743         In RNIB SDL key is defined following way: RAN:<inventoryName>
744
745         Parameters
746         ----------
747             inventoryName: string
748             ran_function_oid: int
749
750         Returns
751         -------
752             array of ran_function_definition matching to ran_function_oid
753
754         Raises
755         ------
756             SdlTypeError: If function's argument is of an inappropriate type.
757             NotConnected: If SDL is not connected to the backend data storage.
758             RejectedByBackend: If backend data storage rejects the request.
759             BackendError: If the backend data storage fails to process the request.
760         """
761         nodeb = self.GetNodeb(inventoryName)
762         if nodeb is not None:
763             if nodeb.HasField('gnb') and nodeb.gnb.ran_functions is not None:
764                 ranFDList = []
765                 for rf in nodeb.gnb.ran_functions:
766                     if rf.ran_function_oid == ran_function_oid:
767                         ranFDList.append(rf.ran_function_definition)
768                 return ranFDList
769         return None
770
771     def healthcheck(self):
772         """
773         this needs to be understood how this is supposed to work
774         """
775         return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
776
777     # Convenience function for discovering config change events
778
779     def config_check(self, timeout=0):
780         """
781         Checks the watcher for configuration-file events. The watcher
782         prerequisites and event mask are documented in __init__().
783
784         Parameters
785         ----------
786         timeout: int (optional)
787             Number of seconds to wait for a configuration-file event, default 0.
788
789         Returns
790         -------
791         List of Events, possibly empty
792             An event is a tuple with objects wd, mask, cookie and name.
793             For example::
794
795                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
796
797         """
798         if not self._inotify:
799             return []
800         events = self._inotify.read(timeout=timeout)
801         return list(events)
802
803     def stop(self):
804         """
805         cleans up and stops the xapp rmr thread (currently). This is
806         critical for unit testing as pytest will never return if the
807         thread is running.
808
809         TODO: can we register a ctrl-c handler so this gets called on
810         ctrl-c? Because currently two ctrl-c are needed to stop.
811         """
812         if self._appthread is not None:
813             self._appthread.join()
814
815         self.xapp_shutdown()
816
817         self._rmr_loop.stop()
818
819
820 # Public classes that Xapp writers should instantiate or subclass
821 # to implement an Xapp.
822
823
824 class RMRXapp(_BaseXapp):
825     """
826     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
827     only performs an action when a message is received.  Clients should
828     invoke the run method, which has a loop that waits for RMR messages
829     and calls the appropriate client-registered consume callback on each.
830
831     If environment variable CONFIG_FILE is defined, and that variable
832     contains a path to an existing file, this class polls a watcher
833     defined on that file to detect file-write events, and invokes a
834     configuration-change handler on each event. The handler is also
835     invoked at startup.  If no handler function is supplied to the
836     constructor, this class defines a default handler that only logs a
837     message.
838
839     Parameters
840     ----------
841     default_handler: function
842         A function with the signature (summary, sbuf) to be called when a
843         message type is received for which no other handler is registered.
844     default_handler argument summary: dict
845         The RMR message summary, a dict of key-value pairs
846     default_handler argument sbuf: ctypes c_void_p
847         Pointer to an RMR message buffer. The user must call free on this when done.
848     config_handler: function (optional, default is documented above)
849         A function with the signature (json) to be called at startup and each time
850         a configuration-file change event is detected. The JSON object is read from
851         the configuration file, if the prerequisites are met.
852     config_handler argument json: dict
853         The contents of the configuration file, parsed as JSON.
854     rmr_port: integer (optional, default is 4562)
855         Initialize RMR to listen on this port
856     rmr_wait_for_ready: boolean (optional, default is True)
857         Wait for RMR to signal ready before starting the dispatch loop
858     use_fake_sdl: boolean (optional, default is False)
859         Use an in-memory store instead of the real SDL service
860     post_init: function (optional, default None)
861         Run this function after the app initializes and before the dispatch loop starts;
862         its signature should be post_init(self)
863     """
864
865     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
866                  post_init=None):
867         """
868         Also see _BaseXapp
869         """
870         # init base
871         super().__init__(
872             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
873         )
874
875         # setup callbacks
876         self._default_handler = default_handler
877         self._config_handler = config_handler
878         self._dispatch = {}
879
880         # used for thread control
881         self._keep_going = True
882
883         # register a default healthcheck handler
884         # this default checks that rmr is working and SDL is working
885         # the user can override this and register their own handler
886         # if they wish since the "last registered callback wins".
887         def handle_healthcheck(self, summary, sbuf):
888             healthy = self.healthcheck()
889             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
890             self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
891             self.rmr_free(sbuf)
892
893         self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
894
895         # define a default configuration-change handler if none was provided.
896         if not config_handler:
897             def handle_config_change(self, config):
898                 self.logger.debug("xapp_frame: default config handler invoked")
899
900             self._config_handler = handle_config_change
901
902         # call the config handler at startup if prereqs were met
903         if self._inotify:
904             with open(self._config_path) as json_file:
905                 data = json.load(json_file)
906             self.logger.debug("run: invoking config handler at start")
907             self._config_handler(self, data)
908
909     def register_callback(self, handler, message_type):
910         """
911         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
912
913         Parameters
914         ----------
915         handler: function
916             a function with the signature (summary, sbuf) to be called
917             when a message of type message_type is received
918         summary: dict
919             the rmr message summary
920         sbuf: ctypes c_void_p
921             Pointer to an rmr message buffer. The user must call free on this when done.
922
923         message:type: int
924             the message type to look for
925
926         Note if this method is called multiple times for a single message type, the "last one wins".
927         """
928         self._dispatch[message_type] = handler
929
930     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
931         """
932         This function should be called when the reactive Xapp is ready to start.
933         After start, the Xapp's handlers will be called on received messages.
934
935         Parameters
936         ----------
937         thread: bool (optional, default is False)
938             If False, execution is not returned and the framework loops forever.
939             If True, a thread is started to run the queue read/dispatch loop
940             and execution is returned to caller; the thread can be stopped
941             by calling the .stop() method.
942
943         rmr_timeout: integer (optional, default is 5 seconds)
944             Length of time to wait for an RMR message to arrive.
945
946         inotify_timeout: integer (optional, default is 0 seconds)
947             Length of time to wait for an inotify event to arrive.
948         """
949
950         def loop():
951             while self._keep_going:
952
953                 # poll RMR
954                 try:
955                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
956                     # dispatch
957                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
958                     if not func:
959                         func = self._default_handler
960                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
961                     func(self, summary, sbuf)
962                 except queue.Empty:
963                     # the get timed out
964                     pass
965
966                 # poll configuration file watcher
967                 try:
968                     events = self.config_check(timeout=inotify_timeout)
969                     for event in events:
970                         with open(self._config_path) as json_file:
971                             data = json.load(json_file)
972                         self.logger.debug("run: invoking config handler on change event {}".format(event))
973                         self._config_handler(self, data)
974                 except Exception as error:
975                     self.logger.error("run: configuration handler failed: {}".format(error))
976
977         if thread:
978             Thread(target=loop).start()
979         else:
980             loop()
981
982     def stop(self):
983         """
984         Sets the flag to end the dispatch loop.
985         """
986         super().stop()
987         self.logger.debug("Setting flag to end framework work loop.")
988         self._keep_going = False
989
990
991 class Xapp(_BaseXapp):
992     """
993     Represents a generic Xapp where the client provides a single function
994     for the framework to call at startup time (instead of providing callback
995     functions by message type). The Xapp writer must implement and provide a
996     function with a loop-forever construct similar to the `run` function in
997     the `RMRXapp` class.  That function should poll to retrieve RMR messages
998     and dispatch them appropriately, poll for configuration changes, etc.
999
1000     Parameters
1001     ----------
1002     entrypoint: function
1003         This function is called when the Xapp class's run method is invoked.
1004         The function signature must be just function(self)
1005     rmr_port: integer (optional, default is 4562)
1006         Initialize RMR to listen on this port
1007     rmr_wait_for_ready: boolean (optional, default is True)
1008         Wait for RMR to signal ready before starting the dispatch loop
1009     use_fake_sdl: boolean (optional, default is False)
1010         Use an in-memory store instead of the real SDL service
1011     """
1012
1013     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
1014         """
1015         Parameters
1016         ----------
1017
1018         For the other parameters, see class _BaseXapp.
1019         """
1020         # init base
1021         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
1022         self._entrypoint = entrypoint
1023
1024     def run(self):
1025         """
1026         This function should be called when the general Xapp is ready to start.
1027         """
1028         self._entrypoint(self)
1029
1030     # there is no need for stop currently here (base has, and nothing
1031     # special to do here)