[Issue-Id:RICAPP-155]Code changes to support for JJB and Unit testing for AD xApp
[ric-app/ad.git] / ad / main.py
index 8156af3..3298b5f 100644 (file)
 #  limitations under the License.
 # ==================================================================================
 
-import warnings
 import json
 import os
 from ricxappframe.xapp_frame import Xapp
 import pandas as pd
 from ad_model.tb_format import parse
 from ad_model.ad_model import HDB_PREDICT
-import schedule, time
+import schedule
 from ad_train import train
 
+
 def entry(self):
     """
-     If RF model is not present in the path, run train() to train the model for the prediction.
-     Calls predict function for every 1 second(for now as we are using simulated data).
+      If RF model is not present in the path, run train() to train the model for the prediction.
+      Calls predict function for every 1 second(for now as we are using simulated data).
     """
-    if not os.path.isfile('/tmp/ad/RF'):
+    if not os.path.isfile('ad/RF'):
         train()
     schedule.every(1).seconds.do(predict, self)
     while True:
         schedule.run_pending()
 
+
 def predict(self):
     """
-     Read the input csv file that has both normal and anomalous data.
-     Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
-     Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
-     Get the acknowledgement message from the traffic steering.
+      Read the input csv file that has both normal and anomalous data.
+      Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
+      Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
+      Get the acknowledgement message from the traffic steering.
     """
+    val = predict_anomaly(self)
+    if len(val) > 2:
+        msg_to_ts(self, val)
+
 
-    #The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
-    #Read the input csv file 
-    ue_data = pd.read_csv('/tmp/ad/ue_test.csv')
+def predict_anomaly(self):
+    # The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
+    # Read the input csv file
+    ue_data = pd.read_csv('ad/ue_test.csv')
 
-    #Parse the ue data and predict the anomaly records for the randomly selected UEID
+    # Parse the ue data and predict the anomaly records for the randomly selected UEID
     data = parse(ue_data)
     db_df = HDB_PREDICT(data)
-    db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID','MeasTimestampRF' ]].head(1)    
-    db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x : str(x)) # converts into string format
-    #print("db_df: ", db_df) # For debug purpose, we can enable this print statement
+    db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1)
+    db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x))  # converts into string format
 
     # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
-    result = json.loads(db_df.to_json(orient = 'records'))    
+    result = json.loads(db_df.to_json(orient='records'))
     val = json.dumps(result).encode()
+    return val
+
+
+def msg_to_ts(self, val):
+    # send message from ad to ts
+    print("rmr send value:", val)
+    self.rmr_send(val, 30003)
 
-    if len(val) > 2 :
-        print("val: ", val)
-        self.rmr_send(val, 30003)
+    # rmr receive to get the acknowledgement message from the traffic steering.
+    for (summary, sbuf) in self.rmr_get_messages():
+        print("TS_ANOMALY_ACK: {}".format(summary))
+        self.rmr_free(sbuf)
 
-        # rmr receive to get the acknowledgement message from the traffic steering.
-        for (summary, sbuf) in self.rmr_get_messages():
-            print("TS_ANOMALY_ACK: {}".format(summary))
-            self.rmr_free(sbuf)
-    
-# Initiates xapp api and runs the entry() using xapp.run()
-xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)
-xapp.run()
 
+def start():
+    # Initiates xapp api and runs the entry() using xapp.run()
+    xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)
+    xapp.run()