X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ad%2Fmain.py;h=8156af3a1ab6bf4f3edad69b32dca5166b816dbf;hb=d3dff387ed728a6a74a236eb9b65d59d4929714a;hp=3298b5f8c0ce931d3770b43cd8d80875ddcaa4d2;hpb=f98ee76af036d60b8f5077105830ed61a13ed5aa;p=ric-app%2Fad.git diff --git a/ad/main.py b/ad/main.py index 3298b5f..8156af3 100644 --- a/ad/main.py +++ b/ad/main.py @@ -14,69 +14,60 @@ # limitations under the License. # ================================================================================== +import warnings import json import os from ricxappframe.xapp_frame import Xapp import pandas as pd from ad_model.tb_format import parse from ad_model.ad_model import HDB_PREDICT -import schedule +import schedule, time from ad_train import train - def entry(self): """ - If RF model is not present in the path, run train() to train the model for the prediction. - Calls predict function for every 1 second(for now as we are using simulated data). + If RF model is not present in the path, run train() to train the model for the prediction. + Calls predict function for every 1 second(for now as we are using simulated data). """ - if not os.path.isfile('ad/RF'): + if not os.path.isfile('/tmp/ad/RF'): train() schedule.every(1).seconds.do(predict, self) while True: schedule.run_pending() - def predict(self): """ - Read the input csv file that has both normal and anomalous data. - Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set - Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003) - Get the acknowledgement message from the traffic steering. + Read the input csv file that has both normal and anomalous data. + Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set + Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003) + Get the acknowledgement message from the traffic steering. """ - val = predict_anomaly(self) - if len(val) > 2: - msg_to_ts(self, val) - -def predict_anomaly(self): - # The read_csv logic will be modified when we are going to fetch the data from database via sdl api. - # Read the input csv file - ue_data = pd.read_csv('ad/ue_test.csv') + #The read_csv logic will be modified when we are going to fetch the data from database via sdl api. + #Read the input csv file + ue_data = pd.read_csv('/tmp/ad/ue_test.csv') - # Parse the ue data and predict the anomaly records for the randomly selected UEID + #Parse the ue data and predict the anomaly records for the randomly selected UEID data = parse(ue_data) db_df = HDB_PREDICT(data) - db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1) - db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x)) # converts into string format + db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID','MeasTimestampRF' ]].head(1) + db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x : str(x)) # converts into string format + #print("db_df: ", db_df) # For debug purpose, we can enable this print statement # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback - result = json.loads(db_df.to_json(orient='records')) + result = json.loads(db_df.to_json(orient = 'records')) val = json.dumps(result).encode() - return val - - -def msg_to_ts(self, val): - # send message from ad to ts - print("rmr send value:", val) - self.rmr_send(val, 30003) - # rmr receive to get the acknowledgement message from the traffic steering. - for (summary, sbuf) in self.rmr_get_messages(): - print("TS_ANOMALY_ACK: {}".format(summary)) - self.rmr_free(sbuf) + if len(val) > 2 : + print("val: ", val) + self.rmr_send(val, 30003) + # rmr receive to get the acknowledgement message from the traffic steering. + for (summary, sbuf) in self.rmr_get_messages(): + print("TS_ANOMALY_ACK: {}".format(summary)) + self.rmr_free(sbuf) + +# Initiates xapp api and runs the entry() using xapp.run() +xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True) +xapp.run() -def start(): - # Initiates xapp api and runs the entry() using xapp.run() - xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True) - xapp.run()