1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
22 from ricxappframe.xapp_frame import Xapp
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from mdclogpy import Logger
25 from ad_model import modelling, CAUSE
26 from ad_train import ModelTraining
27 from database import DATABASE, DUMMY
31 sdl = SDLWrapper(use_fake_sdl=True)
33 logger = Logger(name=__name__)
37 """ If ML model is not present in the path, It will trigger training module to train the model.
38 Calls predict function every 10 millisecond(for now as we are using simulated data).
43 schedule.every(0.5).seconds.do(predict, self)
45 schedule.run_pending()
56 if not os.path.isfile('src/model'):
57 mt = ModelTraining(db)
62 """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
63 Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
64 Get the acknowledgement of sent message from the traffic steering.
68 if db.data is not None:
69 if set(md.num).issubset(db.data.columns):
70 db.data = db.data.dropna(axis=0)
72 val = predict_anomaly(self, db.data)
74 logger.warning("Parameters does not match with of training data")
76 logger.warning("No data in last 1 second")
78 if (val is not None) and (len(val) > 2):
82 def predict_anomaly(self, df):
83 """ calls ad_predict to detect if given sample is normal or anomalous
84 find out the degradation type if sample is anomalous
85 write given sample along with predicted label to AD measurement
89 ue: array or dataframe
93 val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
95 df['Anomaly'] = md.predict(df)
96 df.loc[:, 'Degradation'] = ''
98 if 1 in df.Anomaly.unique():
99 df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
100 df_a = df.loc[df['Anomaly'] == 1].copy()
102 df_a['time'] = df_a.index
103 cols = [db.ue, 'time', 'Degradation']
104 # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
105 result = json.loads(df_a.loc[:, cols].to_json(orient='records'))
106 val = json.dumps(result).encode()
107 df.loc[:, 'RRU.PrbUsedDl'] = df['RRU.PrbUsedDl'].astype('float')
108 df.index = pd.date_range(start=df.index[0], periods=len(df), freq='1ms')
113 def msg_to_ts(self, val):
114 # send message from ad to ts
115 logger.debug("Sending Anomalous UE to TS")
116 success = self.rmr_send(val, 30003)
118 logger.info(" Message to TS: message sent Successfully")
119 # rmr receive to get the acknowledgement message from the traffic steering.
120 for (summary, sbuf) in self.rmr_get_messages():
121 logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
125 def connectdb(thread=False):
126 # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
134 success = db.connect()
137 def start(thread=False):
138 # Initiates xapp api and runs the entry() using xapp.run()
139 xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
140 xapp.logger.debug("AD xApp starting")