[RICAPP-173] AD xApp Release version 0.0.2 (Dawn)
[ric-app/ad.git] / ad / main.py
1 # ==================================================================================
2 #  Copyright (c) 2020 HCL Technologies Limited.
3 #
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 # ==================================================================================
16
17 import json
18 import os
19 import pandas as pd
20 import schedule
21 from ricxappframe.xapp_frame import Xapp
22 from ad_model.ad_model import ad_predict, CAUSE
23 from ad_train import train
24 from ricxappframe.xapp_sdl import SDLWrapper
25 from database import DATABASE, DUMMY
26 import insert as ins
27
28 db = None
29 cp = None
30 ue_data = None  # needs to be updated in future when live feed will be coming through KPIMON to influxDB
31 pos = 0
32 sdl = SDLWrapper(use_fake_sdl=True)
33
34
35 def entry(self):
36     """  If ML model is not present in the path, It will trigger training module to train the model.
37       Calls predict function every 10 millisecond(for now as we are using simulated data).
38     """
39     if not os.path.isfile('model'):
40         train()
41     schedule.every(0.01).seconds.do(predict, self)
42     while True:
43         schedule.run_pending()
44
45
46 def predict(self):
47     """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
48       Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
49       Get the acknowledgement of sent message from the traffic steering.
50     """
51
52     global pos
53     pos = (pos + 1) % len(ue_data)  # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB
54     sample = ue_data[pos]
55     ue_df = pd.DataFrame([sample], columns=db.data.columns)
56     val = predict_anomaly(self, ue_df)
57     if (val is not None) and (len(val) > 2):
58         msg_to_ts(self, val)
59
60
61 def predict_anomaly(self, df):
62     """ calls ad_predict to detect if given sample is normal or anomalous
63     find out the degradation type if sample is anomalous
64     write given sample along with predicted label to AD measurement
65
66     Parameter
67     ........
68     ue: array or dataframe
69
70     Return
71     ......
72     val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
73     """
74     pred = ad_predict(df)
75     df['Anomaly'] = pred
76     df['Degradation'] = ''
77     val = None
78     if 1 in pred:
79         deg = cp.cause(df)
80         if deg:
81             df['Degradation'] = deg
82             db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
83
84             # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
85             result = json.loads(db_df.to_json(orient='records'))
86             val = json.dumps(result).encode()
87             df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
88     df.index = df.measTimeStampRf
89     result = json.loads(df.to_json(orient='records'))
90
91     df = df.drop('measTimeStampRf', axis=1)
92     db.write_anomaly(df, 'AD')
93     return val
94
95
96 def msg_to_ts(self, val):
97     # send message from ad to ts
98     print("[INFO] Sending Anomalous UE to TS")
99     success = self.rmr_send(val, 30003)
100     if success:
101         print("[INFO] Message to TS: message sent Successfully")
102     # rmr receive to get the acknowledgement message from the traffic steering.
103     for (summary, sbuf) in self.rmr_get_messages():
104         print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
105         self.rmr_free(sbuf)
106
107
108 def connectdb(thread=False):
109     # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
110     global db
111     global cp
112     global ue_data
113     if thread:
114         db = DUMMY()
115     else:
116         ins.populatedb()  # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
117
118         db = DATABASE('UEData')
119         db.read_data("liveUE")
120         ue_data = db.data.values.tolist()  # needs to be updated in future when live feed will be coming through KPIMON to influxDB
121     cp = CAUSE(db)
122
123
124 def start(thread=False):
125     # Initiates xapp api and runs the entry() using xapp.run()
126     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
127     connectdb(thread)
128     xapp.run()