Release 0.0.2
[ric-app/ad.git] / ad / main.py
index 3298b5f..5afb58f 100644 (file)
 
 import json
 import os
-from ricxappframe.xapp_frame import Xapp
 import pandas as pd
-from ad_model.tb_format import parse
-from ad_model.ad_model import HDB_PREDICT
 import schedule
+from ricxappframe.xapp_frame import Xapp
+from ad_model.ad_model import ad_predict, CAUSE
 from ad_train import train
+from ricxappframe.xapp_sdl import SDLWrapper
+from database import DATABASE, DUMMY
+import insert as ins
+
+db = None
+cp = None
+ue_data = None  # needs to be updated in future when live feed will be coming through KPIMON to influxDB
+pos = 0
+sdl = SDLWrapper(use_fake_sdl=True)
 
 
 def entry(self):
+    """  If ML model is not present in the path, It will trigger training module to train the model.
+      Calls predict function every 10 millisecond(for now as we are using simulated data).
     """
-      If RF model is not present in the path, run train() to train the model for the prediction.
-      Calls predict function for every 1 second(for now as we are using simulated data).
-    """
-    if not os.path.isfile('ad/RF'):
+    if not os.path.isfile('model'):
         train()
-    schedule.every(1).seconds.do(predict, self)
+    schedule.every(0.01).seconds.do(predict, self)
     while True:
         schedule.run_pending()
 
 
 def predict(self):
+    """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
+      Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
+      Get the acknowledgement of sent message from the traffic steering.
     """
-      Read the input csv file that has both normal and anomalous data.
-      Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
-      Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
-      Get the acknowledgement message from the traffic steering.
-    """
-    val = predict_anomaly(self)
-    if len(val) > 2:
+
+    global pos
+    pos = (pos + 1) % len(ue_data)  # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB
+    sample = ue_data[pos]
+    ue_df = pd.DataFrame([sample], columns=db.data.columns)
+    val = predict_anomaly(self, ue_df)
+    if (val is not None) and (len(val) > 2):
         msg_to_ts(self, val)
 
 
-def predict_anomaly(self):
-    # The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
-    # Read the input csv file
-    ue_data = pd.read_csv('ad/ue_test.csv')
+def predict_anomaly(self, df):
+    """ calls ad_predict to detect if given sample is normal or anomalous
+    find out the degradation type if sample is anomalous
+    write given sample along with predicted label to AD measurement
 
-    # Parse the ue data and predict the anomaly records for the randomly selected UEID
-    data = parse(ue_data)
-    db_df = HDB_PREDICT(data)
-    db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1)
-    db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x))  # converts into string format
+    Parameter
+    ........
+    ue: array or dataframe
 
-    # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
-    result = json.loads(db_df.to_json(orient='records'))
-    val = json.dumps(result).encode()
+    Return
+    ......
+    val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
+    """
+    pred = ad_predict(df)
+    df['Anomaly'] = pred
+    df['Degradation'] = ''
+    val = None
+    if 1 in pred:
+        deg = cp.cause(df)
+        if deg:
+            df['Degradation'] = deg
+            db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
+
+            # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
+            result = json.loads(db_df.to_json(orient='records'))
+            val = json.dumps(result).encode()
+            df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
+    df.index = df.measTimeStampRf
+    result = json.loads(df.to_json(orient='records'))
+
+    df = df.drop('measTimeStampRf', axis=1)
+    db.write_anomaly(df, 'AD')
     return val
 
 
 def msg_to_ts(self, val):
     # send message from ad to ts
-    print("rmr send value:", val)
-    self.rmr_send(val, 30003)
-
+    print("[INFO] Sending Anomalous UE to TS")
+    success = self.rmr_send(val, 30003)
+    if success:
+        print("[INFO] Message to TS: message sent Successfully")
     # rmr receive to get the acknowledgement message from the traffic steering.
     for (summary, sbuf) in self.rmr_get_messages():
-        print("TS_ANOMALY_ACK: {}".format(summary))
+        print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
         self.rmr_free(sbuf)
 
 
-def start():
+def connectdb(thread=False):
+    # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
+    global db
+    global cp
+    global ue_data
+    if thread:
+        db = DUMMY()
+    else:
+        ins.populatedb()  # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
+
+        db = DATABASE('UEData')
+        db.read_data("liveUE")
+        ue_data = db.data.values.tolist()  # needs to be updated in future when live feed will be coming through KPIMON to influxDB
+    cp = CAUSE(db)
+
+
+def start(thread=False):
     # Initiates xapp api and runs the entry() using xapp.run()
-    xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)
+    xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
+    connectdb(thread)
     xapp.run()