X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ad%2Fmain.py;h=5afb58f17f6b6ddde1f292ab7db05cc709fb424e;hb=refs%2Fheads%2Fe-release;hp=3298b5f8c0ce931d3770b43cd8d80875ddcaa4d2;hpb=f98ee76af036d60b8f5077105830ed61a13ed5aa;p=ric-app%2Fad.git diff --git a/ad/main.py b/ad/main.py index 3298b5f..5afb58f 100644 --- a/ad/main.py +++ b/ad/main.py @@ -16,67 +16,113 @@ import json import os -from ricxappframe.xapp_frame import Xapp import pandas as pd -from ad_model.tb_format import parse -from ad_model.ad_model import HDB_PREDICT import schedule +from ricxappframe.xapp_frame import Xapp +from ad_model.ad_model import ad_predict, CAUSE from ad_train import train +from ricxappframe.xapp_sdl import SDLWrapper +from database import DATABASE, DUMMY +import insert as ins + +db = None +cp = None +ue_data = None # needs to be updated in future when live feed will be coming through KPIMON to influxDB +pos = 0 +sdl = SDLWrapper(use_fake_sdl=True) def entry(self): + """ If ML model is not present in the path, It will trigger training module to train the model. + Calls predict function every 10 millisecond(for now as we are using simulated data). """ - If RF model is not present in the path, run train() to train the model for the prediction. - Calls predict function for every 1 second(for now as we are using simulated data). - """ - if not os.path.isfile('ad/RF'): + if not os.path.isfile('model'): train() - schedule.every(1).seconds.do(predict, self) + schedule.every(0.01).seconds.do(predict, self) while True: schedule.run_pending() def predict(self): + """Read the latest ue sample from influxDB and detects if that is anomalous or normal.. + Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003) + Get the acknowledgement of sent message from the traffic steering. """ - Read the input csv file that has both normal and anomalous data. - Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set - Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003) - Get the acknowledgement message from the traffic steering. - """ - val = predict_anomaly(self) - if len(val) > 2: + + global pos + pos = (pos + 1) % len(ue_data) # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB + sample = ue_data[pos] + ue_df = pd.DataFrame([sample], columns=db.data.columns) + val = predict_anomaly(self, ue_df) + if (val is not None) and (len(val) > 2): msg_to_ts(self, val) -def predict_anomaly(self): - # The read_csv logic will be modified when we are going to fetch the data from database via sdl api. - # Read the input csv file - ue_data = pd.read_csv('ad/ue_test.csv') +def predict_anomaly(self, df): + """ calls ad_predict to detect if given sample is normal or anomalous + find out the degradation type if sample is anomalous + write given sample along with predicted label to AD measurement - # Parse the ue data and predict the anomaly records for the randomly selected UEID - data = parse(ue_data) - db_df = HDB_PREDICT(data) - db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1) - db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x)) # converts into string format + Parameter + ........ + ue: array or dataframe - # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback - result = json.loads(db_df.to_json(orient='records')) - val = json.dumps(result).encode() + Return + ...... + val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type) + """ + pred = ad_predict(df) + df['Anomaly'] = pred + df['Degradation'] = '' + val = None + if 1 in pred: + deg = cp.cause(df) + if deg: + df['Degradation'] = deg + db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']] + + # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback + result = json.loads(db_df.to_json(orient='records')) + val = json.dumps(result).encode() + df.loc[db_df.index, 'Degradation'] = db_df['Degradation'] + df.index = df.measTimeStampRf + result = json.loads(df.to_json(orient='records')) + + df = df.drop('measTimeStampRf', axis=1) + db.write_anomaly(df, 'AD') return val def msg_to_ts(self, val): # send message from ad to ts - print("rmr send value:", val) - self.rmr_send(val, 30003) - + print("[INFO] Sending Anomalous UE to TS") + success = self.rmr_send(val, 30003) + if success: + print("[INFO] Message to TS: message sent Successfully") # rmr receive to get the acknowledgement message from the traffic steering. for (summary, sbuf) in self.rmr_get_messages(): - print("TS_ANOMALY_ACK: {}".format(summary)) + print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary)) self.rmr_free(sbuf) -def start(): +def connectdb(thread=False): + # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance + global db + global cp + global ue_data + if thread: + db = DUMMY() + else: + ins.populatedb() # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB + + db = DATABASE('UEData') + db.read_data("liveUE") + ue_data = db.data.values.tolist() # needs to be updated in future when live feed will be coming through KPIMON to influxDB + cp = CAUSE(db) + + +def start(thread=False): # Initiates xapp api and runs the entry() using xapp.run() - xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True) + xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False) + connectdb(thread) xapp.run()