1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
18 from influxdb import DataFrameClient
19 from configparser import ConfigParser
20 from mdclogpy import Logger
21 from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
22 from requests.exceptions import RequestException, ConnectionError
24 logger = Logger(name=__name__)
27 class DATABASE(object):
28 r""" DATABASE takes an input as database name. It creates a client connection
29 to influxDB and It reads/ writes UE data for a given dabtabase and a measurement.
34 host: str (default='r4-influxdb.ricplt.svc.cluster.local')
35 hostname to connect to InfluxDB
36 port: int (default='8086')
37 port to connect to InfluxDB
38 username: str (default='root')
40 password: str (default='root')
45 client: influxDB client
46 DataFrameClient api to connect influxDB
48 fetched data from database
51 def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
56 self.password = password
64 if self.client is not None:
68 self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
69 version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
70 logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
73 except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
74 logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
77 def read_data(self, train=False, valid=False, limit=False):
78 """Read data method for a given measurement and limit
82 meas: str (default='ueMeasReport')
83 limit:int (defualt=False)
86 query = 'select * from ' + self.meas
87 if not train and not valid and not limit:
88 query += ' where time>now()-1600ms'
90 query += ' where time<now()-5m and time>now()-75m'
92 query += ' where time>now()-5m'
94 query += ' where time>now()-1m limit '+str(limit)
95 result = self.query(query)
96 if result and len(result[self.meas]) != 0:
97 self.data = result[self.meas]
99 def write_anomaly(self, df, meas='AD'):
100 """Write data method for a given measurement
104 meas: str (default='AD')
107 self.client.write_points(df, meas)
108 except (RequestException, InfluxDBClientError, InfluxDBServerError) as e:
109 logger.error('Failed to send metrics to influxdb')
112 def query(self, query):
114 result = self.client.query(query)
115 except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError) as e:
116 logger.error('Failed to connect to influxdb: {}'.format(e))
122 cfg.read('src/ad_config.ini')
123 for section in cfg.sections():
124 if section == 'influxdb':
125 self.host = cfg.get(section, "host")
126 self.port = cfg.get(section, "port")
127 self.user = cfg.get(section, "user")
128 self.password = cfg.get(section, "password")
129 self.path = cfg.get(section, "path")
130 self.ssl = cfg.get(section, "ssl")
131 self.dbname = cfg.get(section, "database")
132 self.meas = cfg.get(section, "measurement")
134 if section == 'features':
135 self.thpt = cfg.get(section, "thpt")
136 self.rsrp = cfg.get(section, "rsrp")
137 self.rsrq = cfg.get(section, "rsrq")
138 self.rssinr = cfg.get(section, "rssinr")
139 self.prb = cfg.get(section, "prb_usage")
140 self.ue = cfg.get(section, "ue")
141 self.anomaly = cfg.get(section, "anomaly")
142 self.a1_param = cfg.get(section, "a1_param")
145 class DUMMY(DATABASE):
149 self.ue_data = pd.read_csv('src/ue.csv')
154 def read_data(self, train=False, valid=False, limit=100000):
156 self.data = self.ue_data.head(limit)
158 self.data = self.ue_data.head(limit).drop(self.anomaly, axis=1)
160 def write_anomaly(self, df, meas_name='AD'):
163 def query(self, query=None):
164 return {'UEReports': self.ue_data.head(1)}