+++ /dev/null
-# ==================================================================================
-# Copyright (c) 2020 HCL Technologies Limited.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==================================================================================
-
-import json
-import os
-import pandas as pd
-import schedule
-from ricxappframe.xapp_frame import Xapp
-from ad_model.ad_model import ad_predict, CAUSE
-from ad_train import train
-from ricxappframe.xapp_sdl import SDLWrapper
-from database import DATABASE, DUMMY
-import insert as ins
-
-db = None
-cp = None
-ue_data = None # needs to be updated in future when live feed will be coming through KPIMON to influxDB
-pos = 0
-sdl = SDLWrapper(use_fake_sdl=True)
-
-
-def entry(self):
- """ If ML model is not present in the path, It will trigger training module to train the model.
- Calls predict function every 10 millisecond(for now as we are using simulated data).
- """
- if not os.path.isfile('model'):
- train()
- schedule.every(0.01).seconds.do(predict, self)
- while True:
- schedule.run_pending()
-
-
-def predict(self):
- """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
- Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
- Get the acknowledgement of sent message from the traffic steering.
- """
-
- global pos
- pos = (pos + 1) % len(ue_data) # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB
- sample = ue_data[pos]
- ue_df = pd.DataFrame([sample], columns=db.data.columns)
- val = predict_anomaly(self, ue_df)
- if (val is not None) and (len(val) > 2):
- msg_to_ts(self, val)
-
-
-def predict_anomaly(self, df):
- """ calls ad_predict to detect if given sample is normal or anomalous
- find out the degradation type if sample is anomalous
- write given sample along with predicted label to AD measurement
-
- Parameter
- ........
- ue: array or dataframe
-
- Return
- ......
- val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
- """
- pred = ad_predict(df)
- df['Anomaly'] = pred
- df['Degradation'] = ''
- val = None
- if 1 in pred:
- deg = cp.cause(df)
- if deg:
- df['Degradation'] = deg
- db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
-
- # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
- result = json.loads(db_df.to_json(orient='records'))
- val = json.dumps(result).encode()
- df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
- df.index = df.measTimeStampRf
- result = json.loads(df.to_json(orient='records'))
-
- df = df.drop('measTimeStampRf', axis=1)
- db.write_anomaly(df, 'AD')
- return val
-
-
-def msg_to_ts(self, val):
- # send message from ad to ts
- print("[INFO] Sending Anomalous UE to TS")
- success = self.rmr_send(val, 30003)
- if success:
- print("[INFO] Message to TS: message sent Successfully")
- # rmr receive to get the acknowledgement message from the traffic steering.
- for (summary, sbuf) in self.rmr_get_messages():
- print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
- self.rmr_free(sbuf)
-
-
-def connectdb(thread=False):
- # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
- global db
- global cp
- global ue_data
- if thread:
- db = DUMMY()
- else:
- ins.populatedb() # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
-
- db = DATABASE('UEData')
- db.read_data("liveUE")
- ue_data = db.data.values.tolist() # needs to be updated in future when live feed will be coming through KPIMON to influxDB
- cp = CAUSE(db)
-
-
-def start(thread=False):
- # Initiates xapp api and runs the entry() using xapp.run()
- xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
- connectdb(thread)
- xapp.run()