[Issue-Id:RICAPP-204] Bump version to 1.0.0 and push to staging area
[ric-app/ad.git] / src / database.py
diff --git a/src/database.py b/src/database.py
new file mode 100644 (file)
index 0000000..8609a2e
--- /dev/null
@@ -0,0 +1,163 @@
+# ==================================================================================
+#  Copyright (c) 2020 HCL Technologies Limited.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+# ==================================================================================
+import time
+import pandas as pd
+from influxdb import DataFrameClient
+from configparser import ConfigParser
+from mdclogpy import Logger
+from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
+from requests.exceptions import RequestException, ConnectionError
+
+logger = Logger(name=__name__)
+
+
+class DATABASE(object):
+    r""" DATABASE takes an input as database name. It creates a client connection
+      to influxDB and It reads/ writes UE data for a given dabtabase and a measurement.
+
+
+    Parameters
+    ----------
+    host: str (default='r4-influxdb.ricplt.svc.cluster.local')
+        hostname to connect to InfluxDB
+    port: int (default='8086')
+        port to connect to InfluxDB
+    username: str (default='root')
+        user to connect
+    password: str (default='root')
+        password of the use
+
+    Attributes
+    ----------
+    client: influxDB client
+        DataFrameClient api to connect influxDB
+    data: DataFrame
+        fetched data from database
+    """
+
+    def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
+        self.data = None
+        self.host = host
+        self.port = port
+        self.user = user
+        self.password = password
+        self.path = path
+        self.ssl = ssl
+        self.dbname = dbname
+        self.client = None
+        self.config()
+
+    def connect(self):
+        if self.client is not None:
+            self.client.close()
+
+        try:
+            self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
+            version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
+            logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
+            return True
+
+        except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
+            logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
+            time.sleep(120)
+
+    def read_data(self, train=False, valid=False, limit=False):
+        """Read data method for a given measurement and limit
+
+        Parameters
+        ----------
+        meas: str (default='ueMeasReport')
+        limit:int (defualt=False)
+        """
+        self.data = None
+        query = 'select * from ' + self.meas
+        if not train and not valid and not limit:
+            query += ' where time>now()-1600ms'
+        elif train:
+            query += ' where time<now()-5m and time>now()-75m'
+        elif valid:
+            query += ' where time>now()-5m'
+        elif limit:
+            query += ' where time>now()-1m limit '+str(limit)
+        result = self.query(query)
+        if result and len(result[self.meas]) != 0:
+            self.data = result[self.meas]
+
+    def write_anomaly(self, df, meas='AD'):
+        """Write data method for a given measurement
+
+        Parameters
+        ----------
+        meas: str (default='AD')
+        """
+        try:
+            self.client.write_points(df, meas)
+        except (RequestException, InfluxDBClientError, InfluxDBServerError) as e:
+            logger.error('Failed to send metrics to influxdb')
+            print(e)
+
+    def query(self, query):
+        try:
+            result = self.client.query(query)
+        except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError) as e:
+            logger.error('Failed to connect to influxdb: {}'.format(e))
+            result = False
+        return result
+
+    def config(self):
+        cfg = ConfigParser()
+        cfg.read('src/ad_config.ini')
+        for section in cfg.sections():
+            if section == 'influxdb':
+                self.host = cfg.get(section, "host")
+                self.port = cfg.get(section, "port")
+                self.user = cfg.get(section, "user")
+                self.password = cfg.get(section, "password")
+                self.path = cfg.get(section, "path")
+                self.ssl = cfg.get(section, "ssl")
+                self.dbname = cfg.get(section, "database")
+                self.meas = cfg.get(section, "measurement")
+
+            if section == 'features':
+                self.thpt = cfg.get(section, "thpt")
+                self.rsrp = cfg.get(section, "rsrp")
+                self.rsrq = cfg.get(section, "rsrq")
+                self.rssinr = cfg.get(section, "rssinr")
+                self.prb = cfg.get(section, "prb_usage")
+                self.ue = cfg.get(section, "ue")
+                self.anomaly = cfg.get(section, "anomaly")
+
+
+class DUMMY(DATABASE):
+
+    def __init__(self):
+        super().__init__()
+        self.ue_data = pd.read_csv('src/ue.csv')
+
+    def connect(self):
+        return True
+
+    def read_data(self, train=False, valid=False, limit=100000):
+        if not train:
+            self.data = self.ue_data.head(limit)
+        else:
+            self.data = self.ue_data.head(limit).drop(self.anomaly, axis=1)
+
+    def write_anomaly(self, df, meas_name='AD'):
+        pass
+
+    def query(self, query=None):
+        return {'UEReports': self.ue_data.head(1)}