Release 0.0.5
[ric-app/qp.git] / src / database.py
1 # ==================================================================================
2 #       Copyright (c) 2020 AT&T Intellectual Property.
3 #       Copyright (c) 2020 HCL Technologies Limited.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 from influxdb import DataFrameClient
18 from configparser import ConfigParser
19 from mdclogpy import Logger
20 from src.exceptions import NoDataError
21 from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
22 from requests.exceptions import RequestException
23 import pandas as pd
24 import time
25
26 logger = Logger(name=__name__)
27
28
29 class DATABASE(object):
30
31     def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
32         self.host = host
33         self.port = port
34         self.user = user
35         self.password = password
36         self.path = path
37         self.ssl = ssl
38         self.dbname = dbname
39         self.data = None
40         self.client = None
41         self.config()
42
43     def connect(self):
44         if self.client is not None:
45             self.client.close()
46
47         try:
48             self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
49             version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
50             logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
51             return True
52
53         except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
54             logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
55             time.sleep(120)
56
57     def read_data(self, meas='ueMeasReport', limit=10000, cellid=False, ueid=False):
58
59         if cellid:
60             meas = self.cellmeas
61             param = self.cid
62             Id = cellid
63
64         if ueid:
65             meas = self.uemeas
66             param = self.ue
67             limit = 1
68             Id = ueid
69
70         query = """select * from {}""".format(meas)
71         query += """ where "{}" = \'{}\'""".format(param, Id)
72         query += "  ORDER BY DESC LIMIT {}".format(limit)
73         self.query(query, meas, Id)
74
75     def query(self, query, meas, Id=False):
76         try:
77             result = self.client.query(query)
78             if len(result) == 0:
79                 raise NoDataError
80             else:
81                 self.data = result[meas]
82
83         except (RequestException, InfluxDBClientError, InfluxDBServerError):
84             logger.error("Failed to connect to influxdb")
85
86         except NoDataError:
87             self.data = None
88             if Id:
89                 logger.error("Data not found for " + Id + " in measurement: "+meas)
90             else:
91                 logger.error("Data not found for " + meas)
92
93     def cells(self, meas='CellReports', limit=100):
94         meas = self.cellmeas
95         query = """select * from {}""".format(meas)
96         query += " ORDER BY DESC LIMIT {}".format(limit)
97         self.query(query, meas)
98         if self.data is not None:
99             return self.data[self.cid].unique()
100
101     def write_prediction(self, df, meas_name='QP'):
102         try:
103             self.client.write_points(df, meas_name)
104         except (RequestException, InfluxDBClientError, InfluxDBServerError):
105             logger.error("Failed to send metrics to influxdb")
106
107     def config(self):
108         cfg = ConfigParser()
109         cfg.read('src/qp_config.ini')
110         for section in cfg.sections():
111             if section == 'influxdb':
112                 self.host = cfg.get(section, "host")
113                 self.port = cfg.get(section, "port")
114                 self.user = cfg.get(section, "user")
115                 self.password = cfg.get(section, "password")
116                 self.path = cfg.get(section, "path")
117                 self.ssl = cfg.get(section, "ssl")
118                 self.dbname = cfg.get(section, "database")
119                 self.cellmeas = cfg.get(section, "cellmeas")
120                 self.uemeas = cfg.get(section, "uemeas")
121
122             if section == 'features':
123                 self.thptparam = [cfg.get(section, "thptUL"), cfg.get(section, "thptDL")]
124                 self.nbcells = cfg.get(section, "nbcells")
125                 self.servcell = cfg.get(section, "servcell")
126                 self.ue = cfg.get(section, "ue")
127                 self.cid = cfg.get(section, "cid")
128
129
130 class DUMMY(DATABASE):
131
132     def __init__(self):
133         super().__init__()
134         self.ue_data = pd.DataFrame([[1002, "c2/B13", 8, 69, 65, 113, 0.1, 0.1, "Car-1", -882, -959, pd.to_datetime("2021-05-12T07:43:51.652")]], columns=["du-id", "RF.serving.Id", "prb_usage", "rsrp", "rsrq", "rssinr", "throughput", "targetTput", "ue-id", "x", "y", "measTimeStampRf"])
135
136         self.cell = pd.read_csv('src/cells.csv')
137
138     def read_data(self, meas='ueMeasReport', limit=100000, cellid=False, ueid=False):
139         if ueid:
140             self.data = self.ue_data.head(limit)
141         if cellid:
142             self.data = self.cell.head(limit)
143
144     def cells(self):
145         return self.cell[self.cid].unique()
146
147     def write_prediction(self, df, meas_name='QP'):
148         pass
149
150     def query(self, query=None):
151         return {'UEReports': self.ue_data.head(1)}