Release 0.0.6
[ric-app/qp.git] / src / qptrain.py
1 # ==================================================================================
2 #  Copyright (c) 2020 HCL Technologies Limited.
3 #
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 # ==================================================================================
16
17 from statsmodels.tsa.api import VAR
18 from statsmodels.tsa.stattools import adfuller
19 from mdclogpy import Logger
20 from exceptions import DataNotMatchError
21 from sklearn.metrics import mean_squared_error
22 from math import sqrt
23 import pandas as pd
24 import joblib
25 import warnings
26 warnings.filterwarnings("ignore")
27
28 logger = Logger(name=__name__)
29
30
31 class PROCESS(object):
32
33     def __init__(self, data):
34         self.diff = 0
35         self.data = data
36
37     def input_data(self):
38         try:
39             self.data = self.data[db.thptparam]
40             self.data = self.data.fillna(method='bfill')
41         except DataNotMatchError:
42             logger.error('Parameters Downlink throughput and Uplink throughput does not exist in provided data')
43             self.data = None
44
45     def adfuller_test(self, series, thresh=0.05, verbose=False):
46         """ADFuller test for Stationarity of given series and return True or False"""
47         r = adfuller(series, autolag='AIC')
48         output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags': round(r[2], 4), 'n_obs': r[3]}
49         p_value = output['pvalue']
50         if p_value <= thresh:
51             return True
52         else:
53             return False
54
55     def make_stationary(self):
56         """ call adfuller_test() to check for stationary
57             If the column is stationary, perform 1st differencing and return data"""
58         df = self.data.copy()
59         res_adf = []
60         for name, column in df.iteritems():
61             res_adf.append(self.adfuller_test(column))  # Perform ADF test
62         if not all(res_adf):
63             self.data = df.diff().dropna()
64             self.diff += 1
65
66     def invert_transformation(self, inp, forecast):
67         """Revert back the differencing to get the forecast to original scale."""
68         if self.diff == 0:
69             return forecast
70         df = forecast.copy()
71         columns = inp.columns
72         for col in columns:
73             df[col] = inp[col].iloc[-1] + df[col].cumsum()
74         self.diff = 0
75         return df
76
77     def process(self):
78         self.input_data()
79         self.make_stationary()  # check for Stationarity and make the Time Series Stationary
80
81     def constant(self):
82         val = True
83         df = self.data.copy()
84         df = df[db.thptparam]
85         df = df.drop_duplicates()
86         df = df.loc[:, df.apply(pd.Series.nunique) != 1]
87         if df is not None:
88             df = df.dropna()
89             df = df.loc[:, (df != 0).any(axis=0)]
90             if len(df) >= 10:
91                 val = False
92         return val
93
94     def evaluate_var(self, X, lag):
95         # prepare training dataset
96         train_size = int(len(X) * 0.75)
97         train, test = X[0:train_size], X[train_size:]
98         # make predictions
99         model = VAR(train)
100         model_fit = model.fit(lag)
101         predictions = model_fit.forecast(y=train.values, steps=len(test))
102         # calculate out of sample error
103         rmse = sqrt(mean_squared_error(test, predictions))
104         return rmse
105
106     def optimize_lag(self, df):
107         lag = range(1, 20, 1)
108         df = df.astype('float32')
109         best_score, best_lag = float("inf"), None
110         for l in lag:
111             try:
112                 rmse = self.evaluate_var(df, l)
113                 if rmse < best_score:
114                     best_score, best_lag = rmse, l
115             except ValueError as v:
116                 print(v)
117         # print('Best VAR%s RMSE=%.3f' % (best_lag, best_score))
118         return best_lag
119
120
121 def train_cid(cid):
122     """
123      Read the input file(based on cell id received from the main program)
124      call process() to forecast the downlink and uplink of the input cell id
125      Make a VAR model, call the fit method with the desired lag order.
126     """
127     # print(f'Training for {cid}')
128     db.read_data(cellid=cid, limit=4800)
129     md = PROCESS(db.data)
130     if md.data is not None and not md.constant():
131         md.process()
132         lag = md.optimize_lag(md.data)
133         model = VAR(md.data)          # Make a VAR model
134         try:
135             model_fit = model.fit(lag)            # call fit method with lag order
136             file_name = 'src/'+cid.replace('/', '')
137             with open(file_name, 'wb') as f:
138                 joblib.dump(model_fit, f)     # Save the model with the cell id name
139         except ValueError as v:
140             print("****************************************", v)
141
142
143 def train(database, cid):
144     global db
145     db = database
146     train_cid(cid)