[Issue-Id:RICAPP-204] Bump version to 1.0.0 and push to staging area
[ric-app/ad.git] / src / main.py
similarity index 62%
rename from ad/main.py
rename to src/main.py
index 5afb58f..f2a5baf 100644 (file)
 
 import json
 import os
+import time
 import pandas as pd
 import schedule
 from ricxappframe.xapp_frame import Xapp
-from ad_model.ad_model import ad_predict, CAUSE
-from ad_train import train
 from ricxappframe.xapp_sdl import SDLWrapper
+from mdclogpy import Logger
+from ad_model import modelling, CAUSE
+from ad_train import ModelTraining
 from database import DATABASE, DUMMY
-import insert as ins
 
 db = None
 cp = None
-ue_data = None  # needs to be updated in future when live feed will be coming through KPIMON to influxDB
-pos = 0
 sdl = SDLWrapper(use_fake_sdl=True)
 
+logger = Logger(name=__name__)
+
 
 def entry(self):
     """  If ML model is not present in the path, It will trigger training module to train the model.
       Calls predict function every 10 millisecond(for now as we are using simulated data).
     """
-    if not os.path.isfile('model'):
-        train()
-    schedule.every(0.01).seconds.do(predict, self)
+    connectdb()
+    train_model()
+    load_model()
+    schedule.every(0.5).seconds.do(predict, self)
     while True:
         schedule.run_pending()
 
 
+def load_model():
+    global md
+    global cp
+    md = modelling()
+    cp = CAUSE()
+
+
+def train_model():
+    if not os.path.isfile('src/model'):
+        mt = ModelTraining(db)
+        mt.train()
+
+
 def predict(self):
     """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
       Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
       Get the acknowledgement of sent message from the traffic steering.
     """
-
-    global pos
-    pos = (pos + 1) % len(ue_data)  # iterate through entire list one by one in cycle manner and will be updated when live feed will be coming through KPIMON to influxDB
-    sample = ue_data[pos]
-    ue_df = pd.DataFrame([sample], columns=db.data.columns)
-    val = predict_anomaly(self, ue_df)
+    db.read_data()
+    val = None
+    if db.data is not None:
+        if set(md.num).issubset(db.data.columns):
+            db.data = db.data.dropna(axis=0)
+            if len(db.data) > 0:
+                val = predict_anomaly(self, db.data)
+        else:
+            logger.warning("Parameters does not match with of training data")
+    else:
+        logger.warning("No data in last 1 second")
+        time.sleep(1)
     if (val is not None) and (len(val) > 2):
         msg_to_ts(self, val)
 
@@ -71,58 +92,50 @@ def predict_anomaly(self, df):
     ......
     val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
     """
-    pred = ad_predict(df)
-    df['Anomaly'] = pred
-    df['Degradation'] = ''
+    df['Anomaly'] = md.predict(df)
+    df.loc[:, 'Degradation'] = ''
     val = None
-    if 1 in pred:
-        deg = cp.cause(df)
-        if deg:
-            df['Degradation'] = deg
-            db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
-
+    if 1 in df.Anomaly.unique():
+        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
+        df_a = df.loc[df['Anomaly'] == 1].copy()
+        if len(df_a) > 0:
+            df_a['time'] = df_a.index
+            cols = [db.ue, 'time', 'Degradation']
             # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
-            result = json.loads(db_df.to_json(orient='records'))
+            result = json.loads(df_a.loc[:, cols].to_json(orient='records'))
             val = json.dumps(result).encode()
-            df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
-    df.index = df.measTimeStampRf
-    result = json.loads(df.to_json(orient='records'))
-
-    df = df.drop('measTimeStampRf', axis=1)
-    db.write_anomaly(df, 'AD')
+    df.loc[:, 'RRU.PrbUsedDl'] = df['RRU.PrbUsedDl'].astype('float')
+    df.index = pd.date_range(start=df.index[0], periods=len(df), freq='1ms')
+    db.write_anomaly(df)
     return val
 
 
 def msg_to_ts(self, val):
     # send message from ad to ts
-    print("[INFO] Sending Anomalous UE to TS")
+    logger.debug("Sending Anomalous UE to TS")
     success = self.rmr_send(val, 30003)
     if success:
-        print("[INFO] Message to TS: message sent Successfully")
+        logger.info(" Message to TS: message sent Successfully")
     # rmr receive to get the acknowledgement message from the traffic steering.
     for (summary, sbuf) in self.rmr_get_messages():
-        print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
         self.rmr_free(sbuf)
 
 
 def connectdb(thread=False):
     # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
     global db
-    global cp
-    global ue_data
     if thread:
         db = DUMMY()
     else:
-        ins.populatedb()  # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
-
-        db = DATABASE('UEData')
-        db.read_data("liveUE")
-        ue_data = db.data.values.tolist()  # needs to be updated in future when live feed will be coming through KPIMON to influxDB
-    cp = CAUSE(db)
+        db = DATABASE()
+    success = False
+    while not success:
+        success = db.connect()
 
 
 def start(thread=False):
     # Initiates xapp api and runs the entry() using xapp.run()
     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
-    connectdb(thread)
+    xapp.logger.debug("AD xApp starting")
     xapp.run()