f2a5bafa0e5a4b402789711149d80ab820609668
[ric-app/ad.git] / src / main.py
1 # ==================================================================================
2 #  Copyright (c) 2020 HCL Technologies Limited.
3 #
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 # ==================================================================================
16
17 import json
18 import os
19 import time
20 import pandas as pd
21 import schedule
22 from ricxappframe.xapp_frame import Xapp
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from mdclogpy import Logger
25 from ad_model import modelling, CAUSE
26 from ad_train import ModelTraining
27 from database import DATABASE, DUMMY
28
29 db = None
30 cp = None
31 sdl = SDLWrapper(use_fake_sdl=True)
32
33 logger = Logger(name=__name__)
34
35
36 def entry(self):
37     """  If ML model is not present in the path, It will trigger training module to train the model.
38       Calls predict function every 10 millisecond(for now as we are using simulated data).
39     """
40     connectdb()
41     train_model()
42     load_model()
43     schedule.every(0.5).seconds.do(predict, self)
44     while True:
45         schedule.run_pending()
46
47
48 def load_model():
49     global md
50     global cp
51     md = modelling()
52     cp = CAUSE()
53
54
55 def train_model():
56     if not os.path.isfile('src/model'):
57         mt = ModelTraining(db)
58         mt.train()
59
60
61 def predict(self):
62     """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
63       Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
64       Get the acknowledgement of sent message from the traffic steering.
65     """
66     db.read_data()
67     val = None
68     if db.data is not None:
69         if set(md.num).issubset(db.data.columns):
70             db.data = db.data.dropna(axis=0)
71             if len(db.data) > 0:
72                 val = predict_anomaly(self, db.data)
73         else:
74             logger.warning("Parameters does not match with of training data")
75     else:
76         logger.warning("No data in last 1 second")
77         time.sleep(1)
78     if (val is not None) and (len(val) > 2):
79         msg_to_ts(self, val)
80
81
82 def predict_anomaly(self, df):
83     """ calls ad_predict to detect if given sample is normal or anomalous
84     find out the degradation type if sample is anomalous
85     write given sample along with predicted label to AD measurement
86
87     Parameter
88     ........
89     ue: array or dataframe
90
91     Return
92     ......
93     val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
94     """
95     df['Anomaly'] = md.predict(df)
96     df.loc[:, 'Degradation'] = ''
97     val = None
98     if 1 in df.Anomaly.unique():
99         df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
100         df_a = df.loc[df['Anomaly'] == 1].copy()
101         if len(df_a) > 0:
102             df_a['time'] = df_a.index
103             cols = [db.ue, 'time', 'Degradation']
104             # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
105             result = json.loads(df_a.loc[:, cols].to_json(orient='records'))
106             val = json.dumps(result).encode()
107     df.loc[:, 'RRU.PrbUsedDl'] = df['RRU.PrbUsedDl'].astype('float')
108     df.index = pd.date_range(start=df.index[0], periods=len(df), freq='1ms')
109     db.write_anomaly(df)
110     return val
111
112
113 def msg_to_ts(self, val):
114     # send message from ad to ts
115     logger.debug("Sending Anomalous UE to TS")
116     success = self.rmr_send(val, 30003)
117     if success:
118         logger.info(" Message to TS: message sent Successfully")
119     # rmr receive to get the acknowledgement message from the traffic steering.
120     for (summary, sbuf) in self.rmr_get_messages():
121         logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
122         self.rmr_free(sbuf)
123
124
125 def connectdb(thread=False):
126     # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
127     global db
128     if thread:
129         db = DUMMY()
130     else:
131         db = DATABASE()
132     success = False
133     while not success:
134         success = db.connect()
135
136
137 def start(thread=False):
138     # Initiates xapp api and runs the entry() using xapp.run()
139     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
140     xapp.logger.debug("AD xApp starting")
141     xapp.run()