RIC-642 related changes: REST subscription, rnib enhancements, symptomdata, rest...
[ric-plt/xapp-frame-py.git] / examples / xapp_subscribe.py
1 #!/usr/bin/env python3
2 # ==================================================================================
3 #       Copyright (c) 2022 Nokia
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import os
18 import sys
19 import time
20 import json
21 import logging
22 import datetime
23 import argparse
24 import threading
25 import http.server
26 import signal
27 import struct
28 import socket
29 import urllib.parse
30 from io import open
31 from time import gmtime, strftime
32
33 #sys.path.insert(0, os.path.abspath("./"))
34 #sys.path.insert(0, os.path.abspath("./ricxappframe"))
35 sys.path.append(os.getcwd())
36 from ricxappframe.xapp_frame import RMRXapp, rmr
37 from ricxappframe.xapp_sdl import SDLWrapper
38 from ricxappframe.xapp_symptomdata import Symptomdata
39 import ricxappframe.xapp_subscribe as subscribe
40 import ricxappframe.xapp_rest as ricrest
41
42 # rmr init mode - when set to port 4561 then will wait for the rtmgr to connect
43 # otherwise will connect to rtmgr like set below
44 RMR_INIT_SVC = b"4560"
45 MRC = None
46 xapp = None
47
48 def signal_handler(sig, frame):
49     global server
50     global MRC
51     
52     server.stop()
53     rmr.rmr_close(MRC)
54     sys.exit(0)
55
56 def RMR_init_xapp(initbind):
57     global RMR_INIT_SVC
58     # Init rmr
59     MRC = mrc = rmr.rmr_init(initbind, rmr.RMR_MAX_RCV_BYTES, 0x00)
60     while rmr.rmr_ready(mrc) == 0:
61         time.sleep(1)
62         print('[%d]::RMR not yet ready')
63     rmr.rmr_set_stimeout(mrc, 1)
64     sbuf = rmr.rmr_alloc_msg(mrc, 500)
65     rmr.rmr_set_vlevel(5)
66     print('[%d]::RMR ready')
67     return mrc, sbuf
68
69 def Subscribe(subscriber):
70     # setup the subscription data
71     subEndPoint = subscriber.SubscriptionParamsClientEndpoint("localhost", 8091, 4061)
72     subsDirective = subscriber.SubscriptionParamsE2SubscriptionDirectives(10, 2, False)
73     subsequentAction = subscriber.SubsequentAction("continue", "w10ms")
74     actionDefinitionList = subscriber.ActionToBeSetup(1, "policy", (11,12,13,14,15), subsequentAction)
75     subsDetail = subscriber.SubscriptionDetail(12110, (1,2,3,4,5), actionDefinitionList)
76     # subscription data ready, make the subscription
77     subObj = subscriber.SubscriptionParams("sub10", subEndPoint,"gnb123456",1231, subsDirective, subsDetail)
78     print(subObj.to_dict())
79     # subscribe
80     data, reason, status  = subscriber.Subscribe(subObj)
81     # returns the json data, make it dictionary
82     print(json.loads(data))
83
84     #data, st, hdrs   = api_instance.call_api(method="POST", resource_path="/ric/v1", body=subObj.to_dict())
85     #print(hdrs)
86     #print(data)
87
88     #response = api_instance.request(method="POST", url="http://127.0.0.1:8088/ric/v1", headers=None, body=subObj.to_dict())
89     #print(response.getheaders())
90     #print(respdict['SubscriptionResponse'])
91
92 def Unsubscribe(subscriber):
93     reason, status  = subscriber.UnSubscribe("ygwefwebw")
94     print(data)
95     print(reason)
96     print(status)
97
98 def QuerySubscribtions(subscriber):
99     data, reason, status  = subscriber.QuerySubscriptions()
100     print(data)
101     print(reason)
102     print(status)
103
104 def read_file(filename):
105     try:
106         with open(filename, 'r') as f:
107             data = f.read()
108             if len(data) == 0:
109                 return None
110             return data
111     except IOError as error:
112         return None
113
114 def getSymptomData(symptomHndl, uriparams):
115     paramlist = urllib.parse.parse_qs(uriparams)
116     [x.upper() for x in paramlist]
117     fromtime = 0
118     totime = 0
119     print(paramlist)
120     if paramlist.get('fromTime'):
121         fromtime = getSeconds(paramlist.get('fromTime')[0])
122     if paramlist.get('toTime'):
123         totime = getSeconds(paramlist.get('toTime')[0])
124     zipfile = symptomHndl.collect("symptomdata"+'-%Y-%m-%d-%H-%M-%S.zip', ('examples/.*.py',), fromtime, totime)
125     if zipfile != None:
126         (zipfile, size, data) = symptomHndl.read()
127         return (zipfile, size, data)
128     return (None, 0, None)
129        
130 def healthyGetReadyHandler(name, path, data, ctype):
131     print(name)
132     print(path)
133    
134     response = server.initResponse()
135     response['payload'] = ("{'status': 'ready'}")
136     return response
137
138 def healthyGetAliveHandler(name, path, data, ctype):
139     print(name)
140     print(path)
141    
142     response = server.initResponse()
143     response['payload'] = ("{'status': 'alive'}")
144     return response
145         
146 def subsResponseCB(name, path, data, ctype):
147     print(name)
148     print(path)
149    
150     response = server.initResponse()
151     response['payload'] = ("{}")
152     return response
153
154 def symptomdataGetHandler(name, path, data, ctype):
155     reponse = ricrest.initResponse()
156     (zipfile, size, filedata) = getSymptomData(symptomHndl, self.path[20:])
157     if filedata != None:
158         reponse['payload'] = filedata
159         reponse['ctype'] = 'application/zip'
160         reponse['attachment'] = "symptomdata.zip"
161         reponse['mode'] = 'binary'
162         return reponse
163     logging.error("Symptom data does not exists")
164     reponse['response'] = 'System error - symptomdata does not exists'
165     reponse['status'] = 500
166     return reponse
167
168
169 def main():
170     global server
171     global xapp
172     global symptomHndl
173     
174     # init the default values
175     ADDRESS = "0.0.0.0"     # bind to all interfaces
176     PORT = 8080             # web server listen port
177     
178     parser = argparse.ArgumentParser()
179     parser.add_argument('-port', dest='port', help='HTTP server listen port, default 3000', required=False, type=int)
180     parser.add_argument('-address', dest='address', help='IP listen address, default all interfaces', required=False, type=str)
181     parser.add_argument('-xapp', dest='xapp', help='xapp name', required=True, type=str)
182     parser.add_argument('-service', dest='service', help='xapp service name (same as pod host name)', required=True, type=str)
183     args = parser.parse_args()
184     
185     if args.port is not None:
186         PORT = args.port
187     if args.address is not None:
188         ADDRESS = args.address
189
190     # handle the RMR_SEED_RT and RMR_RTG_SVC which is different in mcxapp
191     data = None
192     os.environ["RMR_SRC_ID"] = args.service
193     os.environ["RMR_LOG_VLEVEL"] = '4'
194     os.environ["RMR_RTG_SVC"] = "4561"
195     rmrseed = os.environ.get('RMR_SEED_RT')
196     if rmrseed is not None:
197         data = read_file(rmrseed)
198         if data is None:
199             print("RMR seed file %s does not exists or is empty" % (rmrseed))
200     else:
201         print("RMR_SEED_RT seed file not set in environment")
202         data = read_file('uta-rtg.rt')
203         if data is not None:
204             os.environ['RMR_SEED_RT'] = "./uta-rtg.rt"
205             print("Setting the default RMR_SEED_RT=uta-rtg.rt - content:")
206             print(data)
207         else:
208             print("Try to export the RMR_SEED_RT file if your RMR is not getting ready")
209
210     symptomHndl = Symptomdata(args.service, args.xapp, "/tmp/", "http://service-ricplt-lwsd-http:8080/ric/v1/lwsd", 10)
211     
212     # setup the subscription
213     subscriber = subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1")
214     
215     # create the thread HTTP server and set the uri handler callbacks
216     server = ricrest.ThreadedHTTPServer(ADDRESS, PORT)
217     # trick to get the own handler with defined 
218     server.handler.add_handler(server.handler, "GET", "healthAlive", "/ric/v1/health/alive", healthyGetAliveHandler)
219     server.handler.add_handler(server.handler, "GET", "healthReady", "/ric/v1/health/ready", healthyGetReadyHandler)
220     server.handler.add_handler(server.handler, "GET", "symptomdata", "/ric/v1/symptomdata", symptomdataGetHandler)
221     # add as well the own subscription response callback handler
222     if subscriber.ResponseHandler(subsResponseCB, server) is not True:
223         print("Error when trying to set the subscription reponse callback")
224     server.start()
225
226     mrc, sbuf = RMR_init_xapp(b"4560")
227
228     Subscribe(subscriber)
229
230     while True:
231         print("Waiting for a message, will timeout after 2000ms")
232         sbuf = rmr.rmr_torcv_msg(mrc, None, 2000)
233         summary = rmr.message_summary(sbuf)
234         if summary[rmr.RMR_MS_MSG_STATE] == 12:
235             print("Nothing received =(")
236         else:
237             print("Message received!: {}".format(summary))
238             data = rmr.get_payload(sbuf)
239         rmr.rmr_free_msg(sbuf)
240
241 if __name__ == '__main__':
242     signal.signal(signal.SIGQUIT, signal_handler)
243     signal.signal(signal.SIGTERM, signal_handler)
244     signal.signal(signal.SIGINT, signal_handler)
245     main()
246
247