Release 1.0.1
[ric-app/ad.git] / src / database.py
1 # ==================================================================================
2 #  Copyright (c) 2020 HCL Technologies Limited.
3 #
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 # ==================================================================================
16 import time
17 import pandas as pd
18 from influxdb import DataFrameClient
19 from configparser import ConfigParser
20 from mdclogpy import Logger
21 from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
22 from requests.exceptions import RequestException, ConnectionError
23
24 logger = Logger(name=__name__)
25
26
27 class DATABASE(object):
28     r""" DATABASE takes an input as database name. It creates a client connection
29       to influxDB and It reads/ writes UE data for a given dabtabase and a measurement.
30
31
32     Parameters
33     ----------
34     host: str (default='r4-influxdb.ricplt.svc.cluster.local')
35         hostname to connect to InfluxDB
36     port: int (default='8086')
37         port to connect to InfluxDB
38     username: str (default='root')
39         user to connect
40     password: str (default='root')
41         password of the use
42
43     Attributes
44     ----------
45     client: influxDB client
46         DataFrameClient api to connect influxDB
47     data: DataFrame
48         fetched data from database
49     """
50
51     def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
52         self.data = None
53         self.host = host
54         self.port = port
55         self.user = user
56         self.password = password
57         self.path = path
58         self.ssl = ssl
59         self.dbname = dbname
60         self.client = None
61         self.config()
62
63     def connect(self):
64         if self.client is not None:
65             self.client.close()
66
67         try:
68             self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
69             version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
70             logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
71             return True
72
73         except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
74             logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
75             time.sleep(120)
76
77     def read_data(self, train=False, valid=False, limit=False):
78         """Read data method for a given measurement and limit
79
80         Parameters
81         ----------
82         meas: str (default='ueMeasReport')
83         limit:int (defualt=False)
84         """
85         self.data = None
86         query = 'select * from ' + self.meas
87         if not train and not valid and not limit:
88             query += ' where time>now()-1600ms'
89         elif train:
90             query += ' where time<now()-5m and time>now()-75m'
91         elif valid:
92             query += ' where time>now()-5m'
93         elif limit:
94             query += ' where time>now()-1m limit '+str(limit)
95         result = self.query(query)
96         if result and len(result[self.meas]) != 0:
97             self.data = result[self.meas]
98
99     def write_anomaly(self, df, meas='AD'):
100         """Write data method for a given measurement
101
102         Parameters
103         ----------
104         meas: str (default='AD')
105         """
106         try:
107             self.client.write_points(df, meas)
108         except (RequestException, InfluxDBClientError, InfluxDBServerError) as e:
109             logger.error('Failed to send metrics to influxdb')
110             print(e)
111
112     def query(self, query):
113         try:
114             result = self.client.query(query)
115         except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError) as e:
116             logger.error('Failed to connect to influxdb: {}'.format(e))
117             result = False
118         return result
119
120     def config(self):
121         cfg = ConfigParser()
122         cfg.read('src/ad_config.ini')
123         for section in cfg.sections():
124             if section == 'influxdb':
125                 self.host = cfg.get(section, "host")
126                 self.port = cfg.get(section, "port")
127                 self.user = cfg.get(section, "user")
128                 self.password = cfg.get(section, "password")
129                 self.path = cfg.get(section, "path")
130                 self.ssl = cfg.get(section, "ssl")
131                 self.dbname = cfg.get(section, "database")
132                 self.meas = cfg.get(section, "measurement")
133
134             if section == 'features':
135                 self.thpt = cfg.get(section, "thpt")
136                 self.rsrp = cfg.get(section, "rsrp")
137                 self.rsrq = cfg.get(section, "rsrq")
138                 self.rssinr = cfg.get(section, "rssinr")
139                 self.prb = cfg.get(section, "prb_usage")
140                 self.ue = cfg.get(section, "ue")
141                 self.anomaly = cfg.get(section, "anomaly")
142                 self.a1_param = cfg.get(section, "a1_param")
143
144
145 class DUMMY(DATABASE):
146
147     def __init__(self):
148         super().__init__()
149         self.ue_data = pd.read_csv('src/ue.csv')
150
151     def connect(self):
152         return True
153
154     def read_data(self, train=False, valid=False, limit=100000):
155         if not train:
156             self.data = self.ue_data.head(limit)
157         else:
158             self.data = self.ue_data.head(limit).drop(self.anomaly, axis=1)
159
160     def write_anomaly(self, df, meas_name='AD'):
161         pass
162
163     def query(self, query=None):
164         return {'UEReports': self.ue_data.head(1)}