# limitations under the License.
# ==================================================================================
-import warnings
import json
import os
from ricxappframe.xapp_frame import Xapp
import pandas as pd
from ad_model.tb_format import parse
from ad_model.ad_model import HDB_PREDICT
-import schedule, time
+import schedule
from ad_train import train
+
def entry(self):
"""
- If RF model is not present in the path, run train() to train the model for the prediction.
- Calls predict function for every 1 second(for now as we are using simulated data).
+ If RF model is not present in the path, run train() to train the model for the prediction.
+ Calls predict function for every 1 second(for now as we are using simulated data).
"""
- if not os.path.isfile('/tmp/ad/RF'):
+ if not os.path.isfile('ad/RF'):
train()
schedule.every(1).seconds.do(predict, self)
while True:
schedule.run_pending()
+
def predict(self):
"""
- Read the input csv file that has both normal and anomalous data.
- Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
- Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
- Get the acknowledgement message from the traffic steering.
+ Read the input csv file that has both normal and anomalous data.
+ Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
+ Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
+ Get the acknowledgement message from the traffic steering.
"""
+ val = predict_anomaly(self)
+ if len(val) > 2:
+ msg_to_ts(self, val)
+
- #The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
- #Read the input csv file
- ue_data = pd.read_csv('/tmp/ad/ue_test.csv')
+def predict_anomaly(self):
+ # The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
+ # Read the input csv file
+ ue_data = pd.read_csv('ad/ue_test.csv')
- #Parse the ue data and predict the anomaly records for the randomly selected UEID
+ # Parse the ue data and predict the anomaly records for the randomly selected UEID
data = parse(ue_data)
db_df = HDB_PREDICT(data)
- db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID','MeasTimestampRF' ]].head(1)
- db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x : str(x)) # converts into string format
- #print("db_df: ", db_df) # For debug purpose, we can enable this print statement
+ db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1)
+ db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x)) # converts into string format
# rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
- result = json.loads(db_df.to_json(orient = 'records'))
+ result = json.loads(db_df.to_json(orient='records'))
val = json.dumps(result).encode()
+ return val
+
+
+def msg_to_ts(self, val):
+ # send message from ad to ts
+ print("rmr send value:", val)
+ self.rmr_send(val, 30003)
- if len(val) > 2 :
- print("val: ", val)
- self.rmr_send(val, 30003)
+ # rmr receive to get the acknowledgement message from the traffic steering.
+ for (summary, sbuf) in self.rmr_get_messages():
+ print("TS_ANOMALY_ACK: {}".format(summary))
+ self.rmr_free(sbuf)
- # rmr receive to get the acknowledgement message from the traffic steering.
- for (summary, sbuf) in self.rmr_get_messages():
- print("TS_ANOMALY_ACK: {}".format(summary))
- self.rmr_free(sbuf)
-
-# Initiates xapp api and runs the entry() using xapp.run()
-xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)
-xapp.run()
+def start():
+ # Initiates xapp api and runs the entry() using xapp.run()
+ xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)
+ xapp.run()