# limitations under the License.
# ==================================================================================
-import warnings
import json
import os
-from ricxappframe.xapp_frame import Xapp
import pandas as pd
-from ad_model.tb_format import parse
-from ad_model.ad_model import HDB_PREDICT
-import schedule, time
+import schedule
+from ricxappframe.xapp_frame import Xapp
+from ad_model.ad_model import ad_predict, CAUSE
from ad_train import train
+from ricxappframe.xapp_sdl import SDLWrapper
+from database import DATABASE, DUMMY
+import insert as ins
+
+db = None
+cp = None
+ue_data = None # needs to be updated in future when live feed will be coming through KPIMON to influxDB
+pos = 0
+sdl = SDLWrapper(use_fake_sdl=True)
+
def entry(self):
+ """ If ML model is not present in the path, It will trigger training module to train the model.
+ Calls predict function every 10 millisecond(for now as we are using simulated data).
"""
- If RF model is not present in the path, run train() to train the model for the prediction.
- Calls predict function for every 1 second(for now as we are using simulated data).
- """
- if not os.path.isfile('/tmp/ad/RF'):
+ if not os.path.isfile('model'):
train()
- schedule.every(1).seconds.do(predict, self)
+ schedule.every(0.01).seconds.do(predict, self)
while True:
schedule.run_pending()
+
def predict(self):
+ """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
+ Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
+ Get the acknowledgement of sent message from the traffic steering.
"""
- Read the input csv file that has both normal and anomalous data.
- Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
- Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
- Get the acknowledgement message from the traffic steering.
+
+ global pos
+ pos = (pos + 1) % len(ue_data) # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB
+ sample = ue_data[pos]
+ ue_df = pd.DataFrame([sample], columns=db.data.columns)
+ val = predict_anomaly(self, ue_df)
+ if (val is not None) and (len(val) > 2):
+ msg_to_ts(self, val)
+
+
+def predict_anomaly(self, df):
+ """ calls ad_predict to detect if given sample is normal or anomalous
+ find out the degradation type if sample is anomalous
+ write given sample along with predicted label to AD measurement
+
+ Parameter
+ ........
+ ue: array or dataframe
+
+ Return
+ ......
+ val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
"""
+ pred = ad_predict(df)
+ df['Anomaly'] = pred
+ df['Degradation'] = ''
+ val = None
+ if 1 in pred:
+ deg = cp.cause(df)
+ if deg:
+ df['Degradation'] = deg
+ db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
+
+ # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
+ result = json.loads(db_df.to_json(orient='records'))
+ val = json.dumps(result).encode()
+ df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
+ df.index = df.measTimeStampRf
+ result = json.loads(df.to_json(orient='records'))
+
+ df = df.drop('measTimeStampRf', axis=1)
+ db.write_anomaly(df, 'AD')
+ return val
+
+
+def msg_to_ts(self, val):
+ # send message from ad to ts
+ print("[INFO] Sending Anomalous UE to TS")
+ success = self.rmr_send(val, 30003)
+ if success:
+ print("[INFO] Message to TS: message sent Successfully")
+ # rmr receive to get the acknowledgement message from the traffic steering.
+ for (summary, sbuf) in self.rmr_get_messages():
+ print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+ self.rmr_free(sbuf)
+
+
+def connectdb(thread=False):
+ # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
+ global db
+ global cp
+ global ue_data
+ if thread:
+ db = DUMMY()
+ else:
+ ins.populatedb() # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
+
+ db = DATABASE('UEData')
+ db.read_data("liveUE")
+ ue_data = db.data.values.tolist() # needs to be updated in future when live feed will be coming through KPIMON to influxDB
+ cp = CAUSE(db)
- #The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
- #Read the input csv file
- ue_data = pd.read_csv('/tmp/ad/ue_test.csv')
-
- #Parse the ue data and predict the anomaly records for the randomly selected UEID
- data = parse(ue_data)
- db_df = HDB_PREDICT(data)
- db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID','MeasTimestampRF' ]].head(1)
- db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x : str(x)) # converts into string format
- #print("db_df: ", db_df) # For debug purpose, we can enable this print statement
-
- # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
- result = json.loads(db_df.to_json(orient = 'records'))
- val = json.dumps(result).encode()
-
- if len(val) > 2 :
- print("val: ", val)
- self.rmr_send(val, 30003)
-
- # rmr receive to get the acknowledgement message from the traffic steering.
- for (summary, sbuf) in self.rmr_get_messages():
- print("TS_ANOMALY_ACK: {}".format(summary))
- self.rmr_free(sbuf)
-
-# Initiates xapp api and runs the entry() using xapp.run()
-xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)
-xapp.run()
+def start(thread=False):
+ # Initiates xapp api and runs the entry() using xapp.run()
+ xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
+ connectdb(thread)
+ xapp.run()