X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=examples%2Fxapp_test.py;fp=examples%2Fxapp_test.py;h=b0be598b23cacee94f35a30b7d01dd7293a84656;hb=12486343219663d484705f05ab8d2ed3306f66d7;hp=0000000000000000000000000000000000000000;hpb=9c09be1e9598d4e145faea412b047b64d38feb22;p=ric-plt%2Fxapp-frame-py.git diff --git a/examples/xapp_test.py b/examples/xapp_test.py new file mode 100644 index 0000000..b0be598 --- /dev/null +++ b/examples/xapp_test.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +# ================================================================================== +# Copyright (c) 2022 Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== +import os +import sys +import time +import json +import logging +import datetime +import argparse +import threading +import http.server +import signal +import struct +import socket +import urllib.parse +from io import open +from time import gmtime, strftime + +#sys.path.insert(0, os.path.abspath("./")) +#sys.path.insert(0, os.path.abspath("./ricxappframe")) +sys.path.append(os.getcwd()) +from ricxappframe.xapp_frame import RMRXapp, rmr +from ricxappframe.xapp_sdl import SDLWrapper +from ricxappframe.xapp_symptomdata import Symptomdata +import ricxappframe.xapp_subscribe as subscribe +import ricxappframe.xapp_rest as ricrest +from mdclogpy import Logger, Level + +class Config(object): + def __init__(self, xapp_name, config_file): + self.config_file = config_file + self.xapp_name = xapp_name + self.cfg = None + self.keys = dict() + self.config() + + def config(self): + with open(self.config_file, 'r') as file: + cfg = file.read() + if cfg != None: + self.cfg = json.loads(cfg) + if self.cfg is not None: + self.controls = self.cfg['controls'] + + def get_key_item(self, key): + data = None + if self.keys.get(key) is not None: + data = self.keys[key] + return data + + def get_config(self): + data = None + with open(self.config_file, 'r') as file: + cfg = file.read() + if cfg != None: + self.cfg = json.loads(cfg) + # following is required by the appmgr - don't know why. + cfgescaped = cfg.replace('"', '\\"').replace('\n', '\\n') + data = '[{ "config": "' + cfgescaped + '", "metadata":{"configType":"json","xappName":"' + self.xapp_name + '"}}]' + if data == None: + logging.error("Config file %s empty or does not exists" % (self.config_file)) + return data + +def read_file(filename): + try: + with open(filename, 'r') as f: + data = f.read() + if len(data) == 0: + return None + return data + except IOError as error: + return None + + +class MyXapp(object): + + def __init__(self, name, service, address, port, config, loglevel): + signal.signal(signal.SIGQUIT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGINT, self.signal_handler) + self.running = False + + self.configData = Config(name, config) + + if self.configData.controls.get('logger'): + level = self.configData.controls['logger'].get('level') + + if port == None and self.configData.cfg['messaging'].get('ports'): + for item in self.configData.cfg['messaging'].get('ports'): + if item['name'] == http: + port = int(item['port']) + if address == None and self.configData.cfg.get('name'): + address = self.configData.cfg.get('name') + + # save the listen address and port for later use + self.port = port + self.address = address + + self.logger = Logger(name, loglevel) + # setup the symptomdata + symptomCfg = self.GetSymptomConfig() + self.symptomHndl = Symptomdata(service, name, "/tmp/", symptomCfg['url'], symptomCfg['timeout']) + + # create the thread HTTP server and set the uri handler callbacks + self.server = ricrest.ThreadedHTTPServer(address, port) + # trick to get the own handler with defined + self.server.handler.add_handler(self.server.handler, "GET", "config", "/ric/v1/config", self.configGetHandler) + self.server.handler.add_handler(self.server.handler, "GET", "healthAlive", "/ric/v1/health/alive", self.healthyGetAliveHandler) + self.server.handler.add_handler(self.server.handler, "GET", "healthReady", "/ric/v1/health/ready", self.healthyGetReadyHandler) + self.server.handler.add_handler(self.server.handler, "GET", "symptomdata", "/ric/v1/symptomdata", self.symptomdataGetHandler) + # start rest server + self.server.start() + # start RMR + self.startRMR(service, 4) + self.running = True + # now we can subscribe + self.Subscribe() + + def startRMR(self, service, level): + # handle the RMR_SEED_RT and RMR_RTG_SVC which is different in mcxapp + data = None + os.environ["RMR_SRC_ID"] = service + os.environ["RMR_LOG_VLEVEL"] = str(level) + os.environ["RMR_RTG_SVC"] = "4561" + rmrseed = os.environ.get('RMR_SEED_RT') + if rmrseed is not None: + data = read_file(rmrseed) + if data is None: + self.logger.warning("RMR seed file %s does not exists or is empty" % (rmrseed)) + else: + self.logger.info("RMR_SEED_RT seed file not set in environment") + data = read_file('uta-rtg.rt') + if data is not None: + os.environ['RMR_SEED_RT'] = "./uta-rtg.rt" + self.logger.info("Setting the default RMR_SEED_RT=uta-rtg.rt - content:") + else: + self.logger.info("Try to export the RMR_SEED_RT file if your RMR is not getting ready") + self.rmrInit(b"4560") + + def signal_handler(self, sig, frame): + if self.running is True: + self.server.stop() + rmr.rmr_close(self.rmr_mrc) + self.running = False + sys.exit(0) + + def rmrInit(self, initbind): + # Init rmr + self.rmr_mrc = rmr.rmr_init(initbind, rmr.RMR_MAX_RCV_BYTES, 0x00) + while rmr.rmr_ready(self.rmr_mrc) == 0: + time.sleep(1) + self.logger.info('RMR not yet ready') + rmr.rmr_set_stimeout(self.rmr_mrc, 1) + rmr.rmr_set_vlevel(5) + self.logger.info('RMR ready') + + def GetSymptomConfig(self): + if self.configData.cfg['controls'].get('symptomdata').get('lwsd'): + return self.configData.cfg['controls'].get('symptomdata').get('lwsd') + + def GetSubsConfig(self): + if self.configData.cfg['controls'].get('subscription'): + return self.configData.cfg['controls'].get('subscription') + + def Subscribe(self): + self.subsCfgDetail = self.GetSubsConfig() + if self.subsCfgDetail != None: + # this is example subscription, for your use case fill the attributes according to your needs + self.subscriber = subscribe.NewSubscriber(self.subsCfgDetail['url'] + 'ric/v1') + # add as well the own subscription response callback handler + if self.subscriber.ResponseHandler(self.subsResponseCB, self.server) is not True: + self.logger.error("Error when trying to set the subscription reponse callback") + # setup the subscription data + subEndPoint = self.subscriber.SubscriptionParamsClientEndpoint(self.subsCfgDetail['clientEndpoint'], self.port, 4061) + subsDirective = self.subscriber.SubscriptionParamsE2SubscriptionDirectives(10, 2, False) + subsequentAction = self.subscriber.SubsequentAction("continue", "w10ms") + actionDefinitionList = self.subscriber.ActionToBeSetup(1, "policy", (11,12,13,14,15), subsequentAction) + subsDetail = self.subscriber.SubscriptionDetail(12110, (1,2,3,4,5), actionDefinitionList) + # subscription data ready, make the subscription + subObj = self.subscriber.SubscriptionParams("sub10", subEndPoint,"gnb123456",1231, subsDirective, subsDetail) + self.logger.info("Sending the subscription to %s" %(self.subsCfgDetail['url'] + 'ric/v1')) + self.logger.info(subObj.to_dict()) + # subscribe + data, reason, status = self.subscriber.Subscribe(subObj) + # returns the json data, make it dictionary + self.logger.info("Getting the subscription reponse") + self.logger.info(json.loads(data)) + + def Unsubscribe(self): + reason, status = self.subscriber.UnSubscribe("ygwefwebw") + + def QuerySubscribtions(self): + data, reason, status = self.subscriber.QuerySubscriptions() + + def healthyGetReadyHandler(self, name, path, data, ctype): + response = server.initResponse() + response['payload'] = ("{'status': 'ready'}") + return response + + def healthyGetAliveHandler(self, name, path, data, ctype): + response = server.initResponse() + response['payload'] = ("{'status': 'alive'}") + return response + + def subsResponseCB(self, name, path, data, ctype): + response = server.initResponse() + response['payload'] = ("{}") + return response + + def getSymptomData(self, uriparams): + paramlist = urllib.parse.parse_qs(uriparams) + [x.upper() for x in paramlist] + fromtime = 0 + totime = 0 + print(paramlist) + if paramlist.get('fromTime'): + fromtime = getSeconds(paramlist.get('fromTime')[0]) + if paramlist.get('toTime'): + totime = getSeconds(paramlist.get('toTime')[0]) + zipfile = self.symptomHndl.collect("symptomdata"+'-%Y-%m-%d-%H-%M-%S.zip', ('examples/.*.py',), fromtime, totime) + if zipfile != None: + (zipfile, size, data) = self.symptomHndl.read() + return (zipfile, size, data) + return (None, 0, None) + + def symptomdataGetHandler(self, name, path, data, ctype): + reponse = ricrest.initResponse() + (zipfile, size, filedata) = self.getSymptomData(self.path[20:]) + if filedata != None: + reponse['payload'] = filedata + reponse['ctype'] = 'application/zip' + reponse['attachment'] = "symptomdata.zip" + reponse['mode'] = 'binary' + return reponse + logging.error("Symptom data does not exists") + reponse['response'] = 'System error - symptomdata does not exists' + reponse['status'] = 500 + return reponse + + def configGetHandler(self, name, path, data, ctype): + response = server.initResponse() + response['payload'] = (self.configData.get_config()) + return response + +def removeEnvVar(evar): + if evar in os.environ: + del os.environ[evar] + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('-port', dest='port', help='HTTP server listen port, default 8088', required=False, type=int) + parser.add_argument('-address', dest='address', help='IP listen address, default all interfaces', required=False, type=str) + parser.add_argument('-config', dest='config', help='config file path name, default /opt/ric/config/config.json', required=False, type=str) + parser.add_argument('-xapp', dest='xapp', help='xapp name', required=True, type=str) + parser.add_argument('-service', dest='service', help='xapp service name (same as pod host name)', required=True, type=str) + parser.add_argument('-verbose', dest='verbose', help='verbose logging level', required=False, type=int) + + args = parser.parse_args() + + if args.port is None: + args.port = 8088 + if args.address is None: + args.address = "0.0.0.0" + if args.config is None: + args.config = '/opt/ric/config/config.json' + + # remove proxy so that it won't impact to rest calls + removeEnvVar('HTTPS_PROXY') + removeEnvVar('HTTP_PROXY') + removeEnvVar('https_proxy') + removeEnvVar('http_proxy') + + # starting argument option will overwrite the config settings + if args.verbose is None: + args.verbose = 2 + + loglevel = Level.INFO + if args.verbose == 0: + loglevel = Level.ERROR + if args.verbose == 1: + loglevel = Level.WARNING + elif args.verbose == 2: + loglevel = Level.INFO + elif args.verbose >= 3: + loglevel = Level.DEBUG + + myxapp = MyXapp(args.xapp, args.service, args.address, args.port, args.config, loglevel) + + while True: + print("Waiting for a message, will timeout after 10s") + rmr_sbuf = rmr.rmr_torcv_msg(myxapp.rmr_mrc, None, 10000) + summary = rmr.message_summary(rmr_sbuf) + if summary[rmr.RMR_MS_MSG_STATE] == 12: + print("Nothing received") + else: + print("Message received!: {}".format(summary)) + data = rmr.get_payload(rmr_sbuf) + rmr.rmr_free_msg(rmr_sbuf) + +if __name__ == '__main__': + main() + +