+ pred = ad_predict(df)
+ df['Anomaly'] = pred
+ df['Degradation'] = ''
+ val = None
+ if 1 in pred:
+ deg = cp.cause(df)
+ if deg:
+ df['Degradation'] = deg
+ db_df = df[['du-id', 'ue-id', 'measTimeStampRf', 'Degradation']]
+
+ # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
+ result = json.loads(db_df.to_json(orient='records'))
+ val = json.dumps(result).encode()
+ df.loc[db_df.index, 'Degradation'] = db_df['Degradation']
+ df.index = df.measTimeStampRf
+ result = json.loads(df.to_json(orient='records'))
+
+ df = df.drop('measTimeStampRf', axis=1)
+ db.write_anomaly(df, 'AD')
+ return val
+
+
+def msg_to_ts(self, val):
+ # send message from ad to ts
+ print("[INFO] Sending Anomalous UE to TS")
+ success = self.rmr_send(val, 30003)
+ if success:
+ print("[INFO] Message to TS: message sent Successfully")
+ # rmr receive to get the acknowledgement message from the traffic steering.
+ for (summary, sbuf) in self.rmr_get_messages():
+ print("[INFO] Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+ self.rmr_free(sbuf)
+
+
+def connectdb(thread=False):
+ # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
+ global db
+ global cp
+ global ue_data
+ if thread:
+ db = DUMMY()
+ else:
+ ins.populatedb() # temporary method to populate db, it will be removed when data will be coming through KPIMON to influxDB
+
+ db = DATABASE('UEData')
+ db.read_data("liveUE")
+ ue_data = db.data.values.tolist() # needs to be updated in future when live feed will be coming through KPIMON to influxDB
+ cp = CAUSE(db)