1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
17 from statsmodels.tsa.api import VAR
18 from statsmodels.tsa.stattools import adfuller
19 from mdclogpy import Logger
20 from exceptions import DataNotMatchError
21 from sklearn.metrics import mean_squared_error
25 warnings.filterwarnings("ignore")
27 logger = Logger(name=__name__)
30 class PROCESS(object):
32 def __init__(self, data):
38 self.data = self.data[db.thptparam]
39 self.data = self.data.fillna(method='bfill')
40 except DataNotMatchError:
41 logger.error('Parameters Downlink throughput and Uplink throughput does not exist in provided data')
44 def adfuller_test(self, series, thresh=0.05, verbose=False):
45 """ADFuller test for Stationarity of given series and return True or False"""
46 r = adfuller(series, autolag='AIC')
47 output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags': round(r[2], 4), 'n_obs': r[3]}
48 p_value = output['pvalue']
54 def make_stationary(self):
55 """ call adfuller_test() to check for stationary
56 If the column is stationary, perform 1st differencing and return data"""
59 for name, column in df.iteritems():
60 res_adf.append(self.adfuller_test(column)) # Perform ADF test
62 self.data = df.diff().dropna()
65 def invert_transformation(self, inp, forecast):
66 """Revert back the differencing to get the forecast to original scale."""
72 df[col] = inp[col].iloc[-1] + df[col].cumsum()
78 self.make_stationary() # check for Stationarity and make the Time Series Stationary
83 df = df.drop_duplicates().dropna()
84 df = df.loc[:, (df != 0).any(axis=0)]
89 def evaluate_var(self, X, lag):
90 # prepare training dataset
91 train_size = int(len(X) * 0.75)
92 train, test = X[0:train_size], X[train_size:]
95 model_fit = model.fit(lag)
96 predictions = model_fit.forecast(y=train.values, steps=len(test))
97 # calculate out of sample error
98 rmse = sqrt(mean_squared_error(test, predictions))
101 def optimize_lag(self, df):
102 lag = range(1, 20, 1)
103 df = df.astype('float32')
104 best_score, best_lag = float("inf"), None
107 rmse = self.evaluate_var(df, l)
108 if rmse < best_score:
109 best_score, best_lag = rmse, l
110 except ValueError as v:
112 # print('Best VAR%s RMSE=%.3f' % (best_lag, best_score))
118 Read the input file(based on cell id received from the main program)
119 call process() to forecast the downlink and uplink of the input cell id
120 Make a VAR model, call the fit method with the desired lag order.
122 # print(f'Training for {cid}')
123 db.read_data(cellid=cid, limit=4800)
124 md = PROCESS(db.data)
125 if md.data is not None and not md.constant():
127 lag = md.optimize_lag(md.data)
128 model = VAR(md.data) # Make a VAR model
130 model_fit = model.fit(lag) # call fit method with lag order
131 file_name = 'src/'+cid.replace('/', '')
132 with open(file_name, 'wb') as f:
133 joblib.dump(model_fit, f) # Save the model with the cell id name
134 except ValueError as v:
135 print("****************************************", v)
138 def train(database, cid):