0ce3438dc2bf2b42b37022c4a9c06d3d5217ea02
[ric-app/qp.git] / src / qptrain.py
1 # ==================================================================================
2 #  Copyright (c) 2020 HCL Technologies Limited.
3 #
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 # ==================================================================================
16
17 from statsmodels.tsa.api import VAR
18 from statsmodels.tsa.stattools import adfuller
19 from mdclogpy import Logger
20 from exceptions import DataNotMatchError
21 from sklearn.metrics import mean_squared_error
22 from math import sqrt
23 import joblib
24 import warnings
25 warnings.filterwarnings("ignore")
26
27 logger = Logger(name=__name__)
28
29
30 class PROCESS(object):
31
32     def __init__(self, data):
33         self.diff = 0
34         self.data = data
35
36     def input_data(self):
37         try:
38             self.data = self.data[db.thptparam]
39             self.data = self.data.fillna(method='bfill')
40         except DataNotMatchError:
41             logger.error('Parameters Downlink throughput and Uplink throughput does not exist in provided data')
42             self.data = None
43
44     def adfuller_test(self, series, thresh=0.05, verbose=False):
45         """ADFuller test for Stationarity of given series and return True or False"""
46         r = adfuller(series, autolag='AIC')
47         output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags': round(r[2], 4), 'n_obs': r[3]}
48         p_value = output['pvalue']
49         if p_value <= thresh:
50             return True
51         else:
52             return False
53
54     def make_stationary(self):
55         """ call adfuller_test() to check for stationary
56             If the column is stationary, perform 1st differencing and return data"""
57         df = self.data.copy()
58         res_adf = []
59         for name, column in df.iteritems():
60             res_adf.append(self.adfuller_test(column))  # Perform ADF test
61         if not all(res_adf):
62             self.data = df.diff().dropna()
63             self.diff += 1
64
65     def invert_transformation(self, inp, forecast):
66         """Revert back the differencing to get the forecast to original scale."""
67         if self.diff == 0:
68             return forecast
69         df = forecast.copy()
70         columns = inp.columns
71         for col in columns:
72             df[col] = inp[col].iloc[-1] + df[col].cumsum()
73         self.diff = 0
74         return df
75
76     def process(self):
77         self.input_data()
78         self.make_stationary()  # check for Stationarity and make the Time Series Stationary
79
80     def constant(self):
81         val = True
82         df = self.data.copy()
83         df = df.drop_duplicates().dropna()
84         df = df.loc[:, (df != 0).any(axis=0)]
85         if len(df) >= 10:
86             val = False
87         return val
88
89     def evaluate_var(self, X, lag):
90         # prepare training dataset
91         train_size = int(len(X) * 0.75)
92         train, test = X[0:train_size], X[train_size:]
93         # make predictions
94         model = VAR(train)
95         model_fit = model.fit(lag)
96         predictions = model_fit.forecast(y=train.values, steps=len(test))
97         # calculate out of sample error
98         rmse = sqrt(mean_squared_error(test, predictions))
99         return rmse
100
101     def optimize_lag(self, df):
102         lag = range(1, 20, 1)
103         df = df.astype('float32')
104         best_score, best_lag = float("inf"), None
105         for l in lag:
106             try:
107                 rmse = self.evaluate_var(df, l)
108                 if rmse < best_score:
109                     best_score, best_lag = rmse, l
110             except ValueError as v:
111                 print(v)
112         # print('Best VAR%s RMSE=%.3f' % (best_lag, best_score))
113         return best_lag
114
115
116 def train_cid(cid):
117     """
118      Read the input file(based on cell id received from the main program)
119      call process() to forecast the downlink and uplink of the input cell id
120      Make a VAR model, call the fit method with the desired lag order.
121     """
122     # print(f'Training for {cid}')
123     db.read_data(cellid=cid, limit=4800)
124     md = PROCESS(db.data)
125     if md.data is not None and not md.constant():
126         md.process()
127         lag = md.optimize_lag(md.data)
128         model = VAR(md.data)          # Make a VAR model
129         try:
130             model_fit = model.fit(lag)            # call fit method with lag order
131             file_name = 'src/'+cid.replace('/', '')
132             with open(file_name, 'wb') as f:
133                 joblib.dump(model_fit, f)     # Save the model with the cell id name
134         except ValueError as v:
135             print("****************************************", v)
136
137
138 def train(database, cid):
139     global db
140     db = database
141     train_cid(cid)