1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
21 from ricxappframe.xapp_frame import Xapp
22 from ad_model.ad_model import ad_predict, CAUSE
23 from ad_train import train
24 from ricxappframe.xapp_sdl import SDLWrapper
25 from database import DATABASE, DUMMY
30 ue_data = None # needs to be updated in future when live feed will be coming through KPIMON to influxDB
32 sdl = SDLWrapper(use_fake_sdl=True)
36 """ If ML model is not present in the path, It will trigger training module to train the model.
37 Calls predict function every 10 millisecond(for now as we are using simulated data).
39 if not os.path.isfile('model'):
41 schedule.every(0.01).seconds.do(predict, self)
43 schedule.run_pending()
47 """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
48 Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
49 Get the acknowledgement of sent message from the traffic steering.
53 pos = (pos + 1) % len(ue_data) # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB
55 ue_df = pd.DataFrame([sample], columns=db.data.columns)
56 val = predict_anomaly(self, ue_df)
57 if (val is not None) and (len(val) > 2):
61 def predict_anomaly(self, df):
62 """ calls ad_predict to detect if given sample is normal or anomalous
63 find out the degradation type if sample is anomalous
64 write given sample along with predicted label to AD measurement
68 ue: array or dataframe
72 val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
76 df['Degradation'] = ''
81 df['Degradation'] = deg
82 db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
84 # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
85 result = json.loads(db_df.to_json(orient='records'))
86 val = json.dumps(result).encode()
87 df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
88 df.index = df.measTimeStampRf
89 result = json.loads(df.to_json(orient='records'))
91 df = df.drop('measTimeStampRf', axis=1)
92 db.write_anomaly(df, 'AD')
96 def msg_to_ts(self, val):
97 # send message from ad to ts
98 print("[INFO] Sending Anomalous UE to TS")
99 success = self.rmr_send(val, 30003)
101 print("[INFO] Message to TS: message sent Successfully")
102 # rmr receive to get the acknowledgement message from the traffic steering.
103 for (summary, sbuf) in self.rmr_get_messages():
104 print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
108 def connectdb(thread=False):
109 # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
116 ins.populatedb() # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
118 db = DATABASE('UEData')
119 db.read_data("liveUE")
120 ue_data = db.data.values.tolist() # needs to be updated in future when live feed will be coming through KPIMON to influxDB
124 def start(thread=False):
125 # Initiates xapp api and runs the entry() using xapp.run()
126 xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)