X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=examples%2Fxapp_subscribe.py;fp=examples%2Fxapp_subscribe.py;h=ab5aaa6cbab15eadbdbd65973c232842faeb7926;hb=12486343219663d484705f05ab8d2ed3306f66d7;hp=0000000000000000000000000000000000000000;hpb=9c09be1e9598d4e145faea412b047b64d38feb22;p=ric-plt%2Fxapp-frame-py.git diff --git a/examples/xapp_subscribe.py b/examples/xapp_subscribe.py new file mode 100644 index 0000000..ab5aaa6 --- /dev/null +++ b/examples/xapp_subscribe.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +# ================================================================================== +# Copyright (c) 2022 Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== +import os +import sys +import time +import json +import logging +import datetime +import argparse +import threading +import http.server +import signal +import struct +import socket +import urllib.parse +from io import open +from time import gmtime, strftime + +#sys.path.insert(0, os.path.abspath("./")) +#sys.path.insert(0, os.path.abspath("./ricxappframe")) +sys.path.append(os.getcwd()) +from ricxappframe.xapp_frame import RMRXapp, rmr +from ricxappframe.xapp_sdl import SDLWrapper +from ricxappframe.xapp_symptomdata import Symptomdata +import ricxappframe.xapp_subscribe as subscribe +import ricxappframe.xapp_rest as ricrest + +# rmr init mode - when set to port 4561 then will wait for the rtmgr to connect +# otherwise will connect to rtmgr like set below +RMR_INIT_SVC = b"4560" +MRC = None +xapp = None + +def signal_handler(sig, frame): + global server + global MRC + + server.stop() + rmr.rmr_close(MRC) + sys.exit(0) + +def RMR_init_xapp(initbind): + global RMR_INIT_SVC + # Init rmr + MRC = mrc = rmr.rmr_init(initbind, rmr.RMR_MAX_RCV_BYTES, 0x00) + while rmr.rmr_ready(mrc) == 0: + time.sleep(1) + print('[%d]::RMR not yet ready') + rmr.rmr_set_stimeout(mrc, 1) + sbuf = rmr.rmr_alloc_msg(mrc, 500) + rmr.rmr_set_vlevel(5) + print('[%d]::RMR ready') + return mrc, sbuf + +def Subscribe(subscriber): + # setup the subscription data + subEndPoint = subscriber.SubscriptionParamsClientEndpoint("localhost", 8091, 4061) + subsDirective = subscriber.SubscriptionParamsE2SubscriptionDirectives(10, 2, False) + subsequentAction = subscriber.SubsequentAction("continue", "w10ms") + actionDefinitionList = subscriber.ActionToBeSetup(1, "policy", (11,12,13,14,15), subsequentAction) + subsDetail = subscriber.SubscriptionDetail(12110, (1,2,3,4,5), actionDefinitionList) + # subscription data ready, make the subscription + subObj = subscriber.SubscriptionParams("sub10", subEndPoint,"gnb123456",1231, subsDirective, subsDetail) + print(subObj.to_dict()) + # subscribe + data, reason, status = subscriber.Subscribe(subObj) + # returns the json data, make it dictionary + print(json.loads(data)) + + #data, st, hdrs = api_instance.call_api(method="POST", resource_path="/ric/v1", body=subObj.to_dict()) + #print(hdrs) + #print(data) + + #response = api_instance.request(method="POST", url="http://127.0.0.1:8088/ric/v1", headers=None, body=subObj.to_dict()) + #print(response.getheaders()) + #print(respdict['SubscriptionResponse']) + +def Unsubscribe(subscriber): + reason, status = subscriber.UnSubscribe("ygwefwebw") + print(data) + print(reason) + print(status) + +def QuerySubscribtions(subscriber): + data, reason, status = subscriber.QuerySubscriptions() + print(data) + print(reason) + print(status) + +def read_file(filename): + try: + with open(filename, 'r') as f: + data = f.read() + if len(data) == 0: + return None + return data + except IOError as error: + return None + +def getSymptomData(symptomHndl, uriparams): + paramlist = urllib.parse.parse_qs(uriparams) + [x.upper() for x in paramlist] + fromtime = 0 + totime = 0 + print(paramlist) + if paramlist.get('fromTime'): + fromtime = getSeconds(paramlist.get('fromTime')[0]) + if paramlist.get('toTime'): + totime = getSeconds(paramlist.get('toTime')[0]) + zipfile = symptomHndl.collect("symptomdata"+'-%Y-%m-%d-%H-%M-%S.zip', ('examples/.*.py',), fromtime, totime) + if zipfile != None: + (zipfile, size, data) = symptomHndl.read() + return (zipfile, size, data) + return (None, 0, None) + +def healthyGetReadyHandler(name, path, data, ctype): + print(name) + print(path) + + response = server.initResponse() + response['payload'] = ("{'status': 'ready'}") + return response + +def healthyGetAliveHandler(name, path, data, ctype): + print(name) + print(path) + + response = server.initResponse() + response['payload'] = ("{'status': 'alive'}") + return response + +def subsResponseCB(name, path, data, ctype): + print(name) + print(path) + + response = server.initResponse() + response['payload'] = ("{}") + return response + +def symptomdataGetHandler(name, path, data, ctype): + reponse = ricrest.initResponse() + (zipfile, size, filedata) = getSymptomData(symptomHndl, self.path[20:]) + if filedata != None: + reponse['payload'] = filedata + reponse['ctype'] = 'application/zip' + reponse['attachment'] = "symptomdata.zip" + reponse['mode'] = 'binary' + return reponse + logging.error("Symptom data does not exists") + reponse['response'] = 'System error - symptomdata does not exists' + reponse['status'] = 500 + return reponse + + +def main(): + global server + global xapp + global symptomHndl + + # init the default values + ADDRESS = "0.0.0.0" # bind to all interfaces + PORT = 8080 # web server listen port + + parser = argparse.ArgumentParser() + parser.add_argument('-port', dest='port', help='HTTP server listen port, default 3000', required=False, type=int) + parser.add_argument('-address', dest='address', help='IP listen address, default all interfaces', required=False, type=str) + parser.add_argument('-xapp', dest='xapp', help='xapp name', required=True, type=str) + parser.add_argument('-service', dest='service', help='xapp service name (same as pod host name)', required=True, type=str) + args = parser.parse_args() + + if args.port is not None: + PORT = args.port + if args.address is not None: + ADDRESS = args.address + + # handle the RMR_SEED_RT and RMR_RTG_SVC which is different in mcxapp + data = None + os.environ["RMR_SRC_ID"] = args.service + os.environ["RMR_LOG_VLEVEL"] = '4' + os.environ["RMR_RTG_SVC"] = "4561" + rmrseed = os.environ.get('RMR_SEED_RT') + if rmrseed is not None: + data = read_file(rmrseed) + if data is None: + print("RMR seed file %s does not exists or is empty" % (rmrseed)) + else: + print("RMR_SEED_RT seed file not set in environment") + data = read_file('uta-rtg.rt') + if data is not None: + os.environ['RMR_SEED_RT'] = "./uta-rtg.rt" + print("Setting the default RMR_SEED_RT=uta-rtg.rt - content:") + print(data) + else: + print("Try to export the RMR_SEED_RT file if your RMR is not getting ready") + + symptomHndl = Symptomdata(args.service, args.xapp, "/tmp/", "http://service-ricplt-lwsd-http:8080/ric/v1/lwsd", 10) + + # setup the subscription + subscriber = subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1") + + # create the thread HTTP server and set the uri handler callbacks + server = ricrest.ThreadedHTTPServer(ADDRESS, PORT) + # trick to get the own handler with defined + server.handler.add_handler(server.handler, "GET", "healthAlive", "/ric/v1/health/alive", healthyGetAliveHandler) + server.handler.add_handler(server.handler, "GET", "healthReady", "/ric/v1/health/ready", healthyGetReadyHandler) + server.handler.add_handler(server.handler, "GET", "symptomdata", "/ric/v1/symptomdata", symptomdataGetHandler) + # add as well the own subscription response callback handler + if subscriber.ResponseHandler(subsResponseCB, server) is not True: + print("Error when trying to set the subscription reponse callback") + server.start() + + mrc, sbuf = RMR_init_xapp(b"4560") + + Subscribe(subscriber) + + while True: + print("Waiting for a message, will timeout after 2000ms") + sbuf = rmr.rmr_torcv_msg(mrc, None, 2000) + summary = rmr.message_summary(sbuf) + if summary[rmr.RMR_MS_MSG_STATE] == 12: + print("Nothing received =(") + else: + print("Message received!: {}".format(summary)) + data = rmr.get_payload(sbuf) + rmr.rmr_free_msg(sbuf) + +if __name__ == '__main__': + signal.signal(signal.SIGQUIT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + main() + +