1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
3 # Copyright (c) 2020 HCL Technologies Limited.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
17 from influxdb import DataFrameClient
18 from configparser import ConfigParser
19 from mdclogpy import Logger
20 from src.exceptions import NoDataError
21 from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
22 from requests.exceptions import RequestException
26 logger = Logger(name=__name__)
29 class DATABASE(object):
31 def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
35 self.password = password
44 if self.client is not None:
48 self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
49 version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
50 logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
53 except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
54 logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
57 def read_data(self, meas='ueMeasReport', limit=10000, cellid=False, ueid=False):
70 query = """select * from {}""".format(meas)
71 query += """ where "{}" = \'{}\'""".format(param, Id)
72 query += " ORDER BY DESC LIMIT {}".format(limit)
73 self.query(query, meas, Id)
75 def query(self, query, meas, Id=False):
77 result = self.client.query(query)
81 self.data = result[meas]
83 except (RequestException, InfluxDBClientError, InfluxDBServerError):
84 logger.error("Failed to connect to influxdb")
89 logger.error("Data not found for " + Id + " in measurement: "+meas)
91 logger.error("Data not found for " + meas)
93 def cells(self, meas='CellReports', limit=100):
95 query = """select * from {}""".format(meas)
96 query += " ORDER BY DESC LIMIT {}".format(limit)
97 self.query(query, meas)
98 if self.data is not None:
99 return self.data[self.cid].unique()
101 def write_prediction(self, df, meas_name='QP'):
103 self.client.write_points(df, meas_name)
104 except (RequestException, InfluxDBClientError, InfluxDBServerError):
105 logger.error("Failed to send metrics to influxdb")
109 cfg.read('src/qp_config.ini')
110 for section in cfg.sections():
111 if section == 'influxdb':
112 self.host = cfg.get(section, "host")
113 self.port = cfg.get(section, "port")
114 self.user = cfg.get(section, "user")
115 self.password = cfg.get(section, "password")
116 self.path = cfg.get(section, "path")
117 self.ssl = cfg.get(section, "ssl")
118 self.dbname = cfg.get(section, "database")
119 self.cellmeas = cfg.get(section, "cellmeas")
120 self.uemeas = cfg.get(section, "uemeas")
122 if section == 'features':
123 self.thptparam = [cfg.get(section, "thptUL"), cfg.get(section, "thptDL")]
124 self.nbcells = cfg.get(section, "nbcells")
125 self.servcell = cfg.get(section, "servcell")
126 self.ue = cfg.get(section, "ue")
127 self.cid = cfg.get(section, "cid")
130 class DUMMY(DATABASE):
134 self.ue_data = pd.DataFrame([[1002, "c2/B13", 8, 69, 65, 113, 0.1, 0.1, "Car-1", -882, -959, pd.to_datetime("2021-05-12T07:43:51.652")]], columns=["du-id", "RF.serving.Id", "prb_usage", "rsrp", "rsrq", "rssinr", "throughput", "targetTput", "ue-id", "x", "y", "measTimeStampRf"])
136 self.cell = pd.read_csv('src/cells.csv')
138 def read_data(self, meas='ueMeasReport', limit=100000, cellid=False, ueid=False):
140 self.data = self.ue_data.head(limit)
142 self.data = self.cell.head(limit)
145 return self.cell[self.cid].unique()
147 def write_prediction(self, df, meas_name='QP'):
150 def query(self, query=None):
151 return {'UEReports': self.ue_data.head(1)}