X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ad%2Fmain.py;h=5afb58f17f6b6ddde1f292ab7db05cc709fb424e;hb=refs%2Fheads%2Fe-release;hp=8156af3a1ab6bf4f3edad69b32dca5166b816dbf;hpb=d3dff387ed728a6a74a236eb9b65d59d4929714a;p=ric-app%2Fad.git diff --git a/ad/main.py b/ad/main.py index 8156af3..5afb58f 100644 --- a/ad/main.py +++ b/ad/main.py @@ -14,60 +14,115 @@ # limitations under the License. # ================================================================================== -import warnings import json import os -from ricxappframe.xapp_frame import Xapp import pandas as pd -from ad_model.tb_format import parse -from ad_model.ad_model import HDB_PREDICT -import schedule, time +import schedule +from ricxappframe.xapp_frame import Xapp +from ad_model.ad_model import ad_predict, CAUSE from ad_train import train +from ricxappframe.xapp_sdl import SDLWrapper +from database import DATABASE, DUMMY +import insert as ins + +db = None +cp = None +ue_data = None # needs to be updated in future when live feed will be coming through KPIMON to influxDB +pos = 0 +sdl = SDLWrapper(use_fake_sdl=True) + def entry(self): + """ If ML model is not present in the path, It will trigger training module to train the model. + Calls predict function every 10 millisecond(for now as we are using simulated data). """ - If RF model is not present in the path, run train() to train the model for the prediction. - Calls predict function for every 1 second(for now as we are using simulated data). - """ - if not os.path.isfile('/tmp/ad/RF'): + if not os.path.isfile('model'): train() - schedule.every(1).seconds.do(predict, self) + schedule.every(0.01).seconds.do(predict, self) while True: schedule.run_pending() + def predict(self): + """Read the latest ue sample from influxDB and detects if that is anomalous or normal.. + Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003) + Get the acknowledgement of sent message from the traffic steering. """ - Read the input csv file that has both normal and anomalous data. - Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set - Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003) - Get the acknowledgement message from the traffic steering. + + global pos + pos = (pos + 1) % len(ue_data) # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB + sample = ue_data[pos] + ue_df = pd.DataFrame([sample], columns=db.data.columns) + val = predict_anomaly(self, ue_df) + if (val is not None) and (len(val) > 2): + msg_to_ts(self, val) + + +def predict_anomaly(self, df): + """ calls ad_predict to detect if given sample is normal or anomalous + find out the degradation type if sample is anomalous + write given sample along with predicted label to AD measurement + + Parameter + ........ + ue: array or dataframe + + Return + ...... + val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type) """ + pred = ad_predict(df) + df['Anomaly'] = pred + df['Degradation'] = '' + val = None + if 1 in pred: + deg = cp.cause(df) + if deg: + df['Degradation'] = deg + db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']] + + # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback + result = json.loads(db_df.to_json(orient='records')) + val = json.dumps(result).encode() + df.loc[db_df.index, 'Degradation'] = db_df['Degradation'] + df.index = df.measTimeStampRf + result = json.loads(df.to_json(orient='records')) + + df = df.drop('measTimeStampRf', axis=1) + db.write_anomaly(df, 'AD') + return val + + +def msg_to_ts(self, val): + # send message from ad to ts + print("[INFO] Sending Anomalous UE to TS") + success = self.rmr_send(val, 30003) + if success: + print("[INFO] Message to TS: message sent Successfully") + # rmr receive to get the acknowledgement message from the traffic steering. + for (summary, sbuf) in self.rmr_get_messages(): + print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary)) + self.rmr_free(sbuf) + + +def connectdb(thread=False): + # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance + global db + global cp + global ue_data + if thread: + db = DUMMY() + else: + ins.populatedb() # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB + + db = DATABASE('UEData') + db.read_data("liveUE") + ue_data = db.data.values.tolist() # needs to be updated in future when live feed will be coming through KPIMON to influxDB + cp = CAUSE(db) - #The read_csv logic will be modified when we are going to fetch the data from database via sdl api. - #Read the input csv file - ue_data = pd.read_csv('/tmp/ad/ue_test.csv') - - #Parse the ue data and predict the anomaly records for the randomly selected UEID - data = parse(ue_data) - db_df = HDB_PREDICT(data) - db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID','MeasTimestampRF' ]].head(1) - db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x : str(x)) # converts into string format - #print("db_df: ", db_df) # For debug purpose, we can enable this print statement - - # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback - result = json.loads(db_df.to_json(orient = 'records')) - val = json.dumps(result).encode() - - if len(val) > 2 : - print("val: ", val) - self.rmr_send(val, 30003) - - # rmr receive to get the acknowledgement message from the traffic steering. - for (summary, sbuf) in self.rmr_get_messages(): - print("TS_ANOMALY_ACK: {}".format(summary)) - self.rmr_free(sbuf) - -# Initiates xapp api and runs the entry() using xapp.run() -xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True) -xapp.run() +def start(thread=False): + # Initiates xapp api and runs the entry() using xapp.run() + xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False) + connectdb(thread) + xapp.run()