X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ad%2Fmain.py;h=3298b5f8c0ce931d3770b43cd8d80875ddcaa4d2;hb=f98ee76af036d60b8f5077105830ed61a13ed5aa;hp=9be6dc9e8acd9e1e387306972458fe3f24b3a78d;hpb=297dbd6245ec69571c8ad7091a18cbe9c7ba2488;p=ric-app%2Fad.git diff --git a/ad/main.py b/ad/main.py index 9be6dc9..3298b5f 100644 --- a/ad/main.py +++ b/ad/main.py @@ -14,60 +14,69 @@ # limitations under the License. # ================================================================================== -import warnings import json import os from ricxappframe.xapp_frame import Xapp import pandas as pd from ad_model.tb_format import parse from ad_model.ad_model import HDB_PREDICT -import schedule, time +import schedule from ad_train import train + def entry(self): """ - If RF model is not present in the path, run train() to train the model for the prediction. - Calls predict function for every 1 second(for now as we are using simulated data). + If RF model is not present in the path, run train() to train the model for the prediction. + Calls predict function for every 1 second(for now as we are using simulated data). """ - if not os.path.isfile('/tmp/ad/RF'): + if not os.path.isfile('ad/RF'): train() schedule.every(1).seconds.do(predict, self) while True: schedule.run_pending() + def predict(self): """ - Read the input csv file that has both normal and anomalous data. - Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set - Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003) - Get the acknowledgement message from the traffic steering. + Read the input csv file that has both normal and anomalous data. + Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set + Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003) + Get the acknowledgement message from the traffic steering. """ + val = predict_anomaly(self) + if len(val) > 2: + msg_to_ts(self, val) + - #The read_csv logic will be modified when we are going to fetch the data from database via sdl api. - #Read the input csv file - ue_data = pd.read_csv('/tmp/ad/ue_test.csv') +def predict_anomaly(self): + # The read_csv logic will be modified when we are going to fetch the data from database via sdl api. + # Read the input csv file + ue_data = pd.read_csv('ad/ue_test.csv') - #Parse the ue data and predict the anomaly records for the randomly selected UEID + # Parse the ue data and predict the anomaly records for the randomly selected UEID data = parse(ue_data) db_df = HDB_PREDICT(data) - db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID','MeasTimestampRF' ]].head(1) - db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x : str(x)) # converts into string format - #print("db_df: ", db_df) # For debug purpose, we can enable this print statement + db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1) + db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x)) # converts into string format # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback - result = json.loads(db_df.to_json(orient = 'records')) + result = json.loads(db_df.to_json(orient='records')) val = json.dumps(result).encode() + return val + + +def msg_to_ts(self, val): + # send message from ad to ts + print("rmr send value:", val) + self.rmr_send(val, 30003) - if len(val) > 2 : - print("val: ", val) - self.rmr_send(val, 30003) + # rmr receive to get the acknowledgement message from the traffic steering. + for (summary, sbuf) in self.rmr_get_messages(): + print("TS_ANOMALY_ACK: {}".format(summary)) + self.rmr_free(sbuf) - # rmr receive to get the acknowledgement message from the traffic steering. - for (summary, sbuf) in self.rmr_get_messages(): - print("TS_ANOMALY_ACK: {}".format(summary)) - self.rmr_free(sbuf) - -# Initiates xapp api and runs the entry() using xapp.run() -xapp = Xapp(entrypoint=entry, rmr_port=4564, use_fake_sdl=True) -xapp.run() +def start(): + # Initiates xapp api and runs the entry() using xapp.run() + xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True) + xapp.run()