Release 1.0.1
[ric-app/ad.git] / src / main.py
1 # ==================================================================================
2 #  Copyright (c) 2020 HCL Technologies Limited.
3 #
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 # ==================================================================================
16
17 import json
18 import os
19 import time
20 import pandas as pd
21 import schedule
22 from ricxappframe.xapp_frame import Xapp, rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from mdclogpy import Logger
25 from ad_model import modelling, CAUSE
26 from ad_train import ModelTraining
27 from database import DATABASE, DUMMY
28
29 db = None
30 cp = None
31 threshold = None
32 sdl = SDLWrapper(use_fake_sdl=True)
33
34 logger = Logger(name=__name__)
35
36
37 def entry(self):
38     """  If ML model is not present in the path, It will trigger training module to train the model.
39       Calls predict function every 10 millisecond(for now as we are using simulated data).
40     """
41     connectdb()
42     train_model()
43     load_model()
44     schedule.every(0.5).seconds.do(predict, self)
45     while True:
46         schedule.run_pending()
47
48
49 def load_model():
50     global md
51     global cp
52     global threshold
53     md = modelling()
54     cp = CAUSE()
55     threshold = 70
56     logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
57
58
59 def train_model():
60     if not os.path.isfile('src/model'):
61         mt = ModelTraining(db)
62         mt.train()
63
64
65 def predict(self):
66     """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
67       Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
68       Get the acknowledgement of sent message from the traffic steering.
69     """
70     db.read_data()
71     val = None
72     if db.data is not None:
73         if set(md.num).issubset(db.data.columns):
74             db.data = db.data.dropna(axis=0)
75             if len(db.data) > 0:
76                 val = predict_anomaly(self, db.data)
77         else:
78             logger.warning("Parameters does not match with of training data")
79     else:
80         logger.warning("No data in last 1 second")
81         time.sleep(1)
82     if (val is not None) and (len(val) > 2):
83         msg_to_ts(self, val)
84
85
86 def predict_anomaly(self, df):
87     """ calls ad_predict to detect if given sample is normal or anomalous
88     find out the degradation type if sample is anomalous
89     write given sample along with predicted label to AD measurement
90
91     Parameter
92     ........
93     ue: array or dataframe
94
95     Return
96     ......
97     val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
98     """
99     df['Anomaly'] = md.predict(df)
100     df.loc[:, 'Degradation'] = ''
101     val = None
102     if 1 in df.Anomaly.unique():
103         df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
104         df_a = df.loc[df['Anomaly'] == 1].copy()
105         if len(df_a) > 0:
106             df_a['time'] = df_a.index
107             cols = [db.ue, 'time', 'Degradation']
108             # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
109             result = json.loads(df_a.loc[:, cols].to_json(orient='records'))
110             val = json.dumps(result).encode()
111     df.loc[:, 'RRU.PrbUsedDl'] = df['RRU.PrbUsedDl'].astype('float')
112     df.index = pd.date_range(start=df.index[0], periods=len(df), freq='1ms')
113     db.write_anomaly(df)
114     return val
115
116
117 def msg_to_ts(self, val):
118     # send message from ad to ts
119     logger.debug("Sending Anomalous UE to TS")
120     success = self.rmr_send(val, 30003)
121     if success:
122         logger.info(" Message to TS: message sent Successfully")
123         # rmr receive to get the acknowledgement message from the traffic steering.
124
125     for summary, sbuf in self.rmr_get_messages():
126         if sbuf.contents.mtype == 30004:
127             logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
128         if sbuf.contents.mtype == 20010:
129             a1_request_handler(self, summary, sbuf)
130         self.rmr_free(sbuf)
131
132
133 def connectdb(thread=False):
134     # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
135     global db
136     if thread:
137         db = DUMMY()
138     else:
139         db = DATABASE()
140     success = False
141     while not success:
142         success = db.connect()
143
144
145 def a1_request_handler(self, summary, sbuf):
146     logger.info("A1 policy received")
147     try:
148         req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
149         logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
150     except (json.decoder.JSONDecodeError, KeyError):
151         logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
152         return
153
154     if verifyPolicy(req):
155         logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
156     else:
157         logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
158     logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
159     change_threshold(self, req)
160     resp = buildPolicyResp(self, req)
161     self.rmr_send(json.dumps(resp).encode(), 20011)
162     logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
163     self.rmr_free(sbuf)
164
165
166 def change_threshold(self, req: dict):
167     if req["operation"] == "CREATE":
168         payload = req["payload"]
169         threshold = json.loads(payload)[db.a1_param]
170         logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
171
172
173 def verifyPolicy(req: dict):
174     for i in ["policy_type_id", "operation", "policy_instance_id"]:
175         if i not in req:
176             return False
177     return True
178
179
180 def buildPolicyResp(self, req: dict):
181     req["handler_id"] = "ad"
182     del req["operation"]
183     del req["payload"]
184     req["status"] = "OK"
185     return req
186
187
188 def start(thread=False):
189     # Initiates xapp api and runs the entry() using xapp.run()
190     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
191     logger.debug("AD xApp starting")
192     xapp.run()