1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
17 from statsmodels.tsa.api import VAR
18 from statsmodels.tsa.stattools import adfuller
19 from mdclogpy import Logger
20 from exceptions import DataNotMatchError
21 from sklearn.metrics import mean_squared_error
26 warnings.filterwarnings("ignore")
28 logger = Logger(name=__name__)
31 class PROCESS(object):
33 def __init__(self, data):
39 self.data = self.data[db.thptparam]
40 self.data = self.data.fillna(method='bfill')
41 except DataNotMatchError:
42 logger.error('Parameters Downlink throughput and Uplink throughput does not exist in provided data')
45 def adfuller_test(self, series, thresh=0.05, verbose=False):
46 """ADFuller test for Stationarity of given series and return True or False"""
47 r = adfuller(series, autolag='AIC')
48 output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags': round(r[2], 4), 'n_obs': r[3]}
49 p_value = output['pvalue']
55 def make_stationary(self):
56 """ call adfuller_test() to check for stationary
57 If the column is stationary, perform 1st differencing and return data"""
60 for name, column in df.iteritems():
61 res_adf.append(self.adfuller_test(column)) # Perform ADF test
63 self.data = df.diff().dropna()
66 def invert_transformation(self, inp, forecast):
67 """Revert back the differencing to get the forecast to original scale."""
73 df[col] = inp[col].iloc[-1] + df[col].cumsum()
79 self.make_stationary() # check for Stationarity and make the Time Series Stationary
85 df = df.drop_duplicates()
86 df = df.loc[:, df.apply(pd.Series.nunique) != 1]
89 df = df.loc[:, (df != 0).any(axis=0)]
94 def evaluate_var(self, X, lag):
95 # prepare training dataset
96 train_size = int(len(X) * 0.75)
97 train, test = X[0:train_size], X[train_size:]
100 model_fit = model.fit(lag)
101 predictions = model_fit.forecast(y=train.values, steps=len(test))
102 # calculate out of sample error
103 rmse = sqrt(mean_squared_error(test, predictions))
106 def optimize_lag(self, df):
107 lag = range(1, 20, 1)
108 df = df.astype('float32')
109 best_score, best_lag = float("inf"), None
112 rmse = self.evaluate_var(df, l)
113 if rmse < best_score:
114 best_score, best_lag = rmse, l
115 except ValueError as v:
117 # print('Best VAR%s RMSE=%.3f' % (best_lag, best_score))
123 Read the input file(based on cell id received from the main program)
124 call process() to forecast the downlink and uplink of the input cell id
125 Make a VAR model, call the fit method with the desired lag order.
127 # print(f'Training for {cid}')
128 db.read_data(cellid=cid, limit=4800)
129 md = PROCESS(db.data)
130 if md.data is not None and not md.constant():
132 lag = md.optimize_lag(md.data)
133 model = VAR(md.data) # Make a VAR model
135 model_fit = model.fit(lag) # call fit method with lag order
136 file_name = 'src/'+cid.replace('/', '')
137 with open(file_name, 'wb') as f:
138 joblib.dump(model_fit, f) # Save the model with the cell id name
139 except ValueError as v:
140 print("****************************************", v)
143 def train(database, cid):