X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fdatabase.py;fp=src%2Fdatabase.py;h=8520053178e55ae9f57bd51d5e47f462a0a9c09e;hb=30a9743fdfd0ef62164c7ea74a4a120cb1c86852;hp=0000000000000000000000000000000000000000;hpb=48aa41171b8ca141df1f14341ea0ec2fde745af5;p=ric-app%2Fqp.git diff --git a/src/database.py b/src/database.py new file mode 100644 index 0000000..8520053 --- /dev/null +++ b/src/database.py @@ -0,0 +1,151 @@ +# ================================================================================== +# Copyright (c) 2020 AT&T Intellectual Property. +# Copyright (c) 2020 HCL Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== +from influxdb import DataFrameClient +from configparser import ConfigParser +from mdclogpy import Logger +from src.exceptions import NoDataError +from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError +from requests.exceptions import RequestException +import pandas as pd +import time + +logger = Logger(name=__name__) + + +class DATABASE(object): + + def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False): + self.host = host + self.port = port + self.user = user + self.password = password + self.path = path + self.ssl = ssl + self.dbname = dbname + self.data = None + self.client = None + self.config() + + def connect(self): + if self.client is not None: + self.client.close() + + try: + self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl) + version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version'] + logger.info("Conected to Influx Database, InfluxDB version : {}".format(version)) + return True + + except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError): + logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname") + time.sleep(120) + + def read_data(self, meas='ueMeasReport', limit=10000, cellid=False, ueid=False): + + if cellid: + meas = self.cellmeas + param = self.cid + Id = cellid + + if ueid: + meas = self.uemeas + param = self.ue + limit = 1 + Id = ueid + + query = """select * from {}""".format(meas) + query += """ where "{}" = \'{}\'""".format(param, Id) + query += " ORDER BY DESC LIMIT {}".format(limit) + self.query(query, meas, Id) + + def query(self, query, meas, Id=False): + try: + result = self.client.query(query) + if len(result) == 0: + raise NoDataError + else: + self.data = result[meas] + + except (RequestException, InfluxDBClientError, InfluxDBServerError): + logger.error("Failed to connect to influxdb") + + except NoDataError: + self.data = None + if Id: + logger.error("Data not found for " + Id + " in measurement: "+meas) + else: + logger.error("Data not found for " + meas) + + def cells(self, meas='CellReports', limit=100): + meas = self.cellmeas + query = """select * from {}""".format(meas) + query += " ORDER BY DESC LIMIT {}".format(limit) + self.query(query, meas) + if self.data is not None: + return self.data[self.cid].unique() + + def write_prediction(self, df, meas_name='QP'): + try: + self.client.write_points(df, meas_name) + except (RequestException, InfluxDBClientError, InfluxDBServerError): + logger.error("Failed to send metrics to influxdb") + + def config(self): + cfg = ConfigParser() + cfg.read('src/qp_config.ini') + for section in cfg.sections(): + if section == 'influxdb': + self.host = cfg.get(section, "host") + self.port = cfg.get(section, "port") + self.user = cfg.get(section, "user") + self.password = cfg.get(section, "password") + self.path = cfg.get(section, "path") + self.ssl = cfg.get(section, "ssl") + self.dbname = cfg.get(section, "database") + self.cellmeas = cfg.get(section, "cellmeas") + self.uemeas = cfg.get(section, "uemeas") + + if section == 'features': + self.thptparam = [cfg.get(section, "thptUL"), cfg.get(section, "thptDL")] + self.nbcells = cfg.get(section, "nbcells") + self.servcell = cfg.get(section, "servcell") + self.ue = cfg.get(section, "ue") + self.cid = cfg.get(section, "cid") + + +class DUMMY(DATABASE): + + def __init__(self): + super().__init__() + self.ue_data = pd.DataFrame([[1002, "c2/B13", 8, 69, 65, 113, 0.1, 0.1, "Car-1", -882, -959, pd.to_datetime("2021-05-12T07:43:51.652")]], columns=["du-id", "RF.serving.Id", "prb_usage", "rsrp", "rsrq", "rssinr", "throughput", "targetTput", "ue-id", "x", "y", "measTimeStampRf"]) + + self.cell = pd.read_csv('src/cells.csv') + + def read_data(self, meas='ueMeasReport', limit=100000, cellid=False, ueid=False): + if ueid: + self.data = self.ue_data.head(limit) + if cellid: + self.data = self.cell.head(limit) + + def cells(self): + return self.cell[self.cid].unique() + + def write_prediction(self, df, meas_name='QP'): + pass + + def query(self, query=None): + return {'UEReports': self.ue_data.head(1)}