Release 0.0.5
[ric-app/qp.git] / src / database.py
diff --git a/src/database.py b/src/database.py
new file mode 100644 (file)
index 0000000..8520053
--- /dev/null
@@ -0,0 +1,151 @@
+# ==================================================================================
+#       Copyright (c) 2020 AT&T Intellectual Property.
+#       Copyright (c) 2020 HCL Technologies Limited.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+from influxdb import DataFrameClient
+from configparser import ConfigParser
+from mdclogpy import Logger
+from src.exceptions import NoDataError
+from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
+from requests.exceptions import RequestException
+import pandas as pd
+import time
+
+logger = Logger(name=__name__)
+
+
+class DATABASE(object):
+
+    def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
+        self.host = host
+        self.port = port
+        self.user = user
+        self.password = password
+        self.path = path
+        self.ssl = ssl
+        self.dbname = dbname
+        self.data = None
+        self.client = None
+        self.config()
+
+    def connect(self):
+        if self.client is not None:
+            self.client.close()
+
+        try:
+            self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
+            version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
+            logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
+            return True
+
+        except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
+            logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
+            time.sleep(120)
+
+    def read_data(self, meas='ueMeasReport', limit=10000, cellid=False, ueid=False):
+
+        if cellid:
+            meas = self.cellmeas
+            param = self.cid
+            Id = cellid
+
+        if ueid:
+            meas = self.uemeas
+            param = self.ue
+            limit = 1
+            Id = ueid
+
+        query = """select * from {}""".format(meas)
+        query += """ where "{}" = \'{}\'""".format(param, Id)
+        query += "  ORDER BY DESC LIMIT {}".format(limit)
+        self.query(query, meas, Id)
+
+    def query(self, query, meas, Id=False):
+        try:
+            result = self.client.query(query)
+            if len(result) == 0:
+                raise NoDataError
+            else:
+                self.data = result[meas]
+
+        except (RequestException, InfluxDBClientError, InfluxDBServerError):
+            logger.error("Failed to connect to influxdb")
+
+        except NoDataError:
+            self.data = None
+            if Id:
+                logger.error("Data not found for " + Id + " in measurement: "+meas)
+            else:
+                logger.error("Data not found for " + meas)
+
+    def cells(self, meas='CellReports', limit=100):
+        meas = self.cellmeas
+        query = """select * from {}""".format(meas)
+        query += " ORDER BY DESC LIMIT {}".format(limit)
+        self.query(query, meas)
+        if self.data is not None:
+            return self.data[self.cid].unique()
+
+    def write_prediction(self, df, meas_name='QP'):
+        try:
+            self.client.write_points(df, meas_name)
+        except (RequestException, InfluxDBClientError, InfluxDBServerError):
+            logger.error("Failed to send metrics to influxdb")
+
+    def config(self):
+        cfg = ConfigParser()
+        cfg.read('src/qp_config.ini')
+        for section in cfg.sections():
+            if section == 'influxdb':
+                self.host = cfg.get(section, "host")
+                self.port = cfg.get(section, "port")
+                self.user = cfg.get(section, "user")
+                self.password = cfg.get(section, "password")
+                self.path = cfg.get(section, "path")
+                self.ssl = cfg.get(section, "ssl")
+                self.dbname = cfg.get(section, "database")
+                self.cellmeas = cfg.get(section, "cellmeas")
+                self.uemeas = cfg.get(section, "uemeas")
+
+            if section == 'features':
+                self.thptparam = [cfg.get(section, "thptUL"), cfg.get(section, "thptDL")]
+                self.nbcells = cfg.get(section, "nbcells")
+                self.servcell = cfg.get(section, "servcell")
+                self.ue = cfg.get(section, "ue")
+                self.cid = cfg.get(section, "cid")
+
+
+class DUMMY(DATABASE):
+
+    def __init__(self):
+        super().__init__()
+        self.ue_data = pd.DataFrame([[1002, "c2/B13", 8, 69, 65, 113, 0.1, 0.1, "Car-1", -882, -959, pd.to_datetime("2021-05-12T07:43:51.652")]], columns=["du-id", "RF.serving.Id", "prb_usage", "rsrp", "rsrq", "rssinr", "throughput", "targetTput", "ue-id", "x", "y", "measTimeStampRf"])
+
+        self.cell = pd.read_csv('src/cells.csv')
+
+    def read_data(self, meas='ueMeasReport', limit=100000, cellid=False, ueid=False):
+        if ueid:
+            self.data = self.ue_data.head(limit)
+        if cellid:
+            self.data = self.cell.head(limit)
+
+    def cells(self):
+        return self.cell[self.cid].unique()
+
+    def write_prediction(self, df, meas_name='QP'):
+        pass
+
+    def query(self, query=None):
+        return {'UEReports': self.ue_data.head(1)}