--- /dev/null
+# ==================================================================================
+# Copyright (c) 2020 HCL Technologies Limited.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+
+from statsmodels.tsa.api import VAR
+from statsmodels.tsa.stattools import adfuller
+import joblib
+
+
+class PROCESS(object):
+
+ def __init__(self, data):
+ self.diff = 0
+ self.data = data
+
+ def adfuller_test(self, series, thresh=0.05, verbose=False):
+ """ADFuller test for Stationarity of given series and return True or False"""
+ r = adfuller(series, autolag='AIC')
+ output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags': round(r[2], 4), 'n_obs': r[3]}
+ p_value = output['pvalue']
+ if p_value <= thresh:
+ return True
+ else:
+ return False
+
+ def make_stationary(self):
+ """ call adfuller_test() to check for stationary
+ If the column is stationary, perform 1st differencing and return data"""
+ df = self.data.copy()
+ res_adf = []
+ for name, column in df.iteritems():
+ res_adf.append(self.adfuller_test(column)) # Perform ADF test
+ if not all(res_adf):
+ self.data = df.diff().dropna()
+ self.diff += 1
+
+ def invert_transformation(self, inp, forecast):
+ """Revert back the differencing to get the forecast to original scale."""
+ if self.diff == 0:
+ return forecast
+ df = forecast.copy()
+ columns = inp.columns
+ for col in columns:
+ df[col] = inp[col].iloc[-1] + df[col].cumsum()
+ self.diff = 0
+ return df
+
+ def process(self):
+ """ Filter throughput parameters, call make_stationary() to check for Stationarity time series
+ """
+ df = self.data.copy()
+ df = df[['pdcpBytesDl', 'pdcpBytesUl']]
+ self.data = df.loc[:, (df != 0).any(axis=0)]
+ self.make_stationary() # check for Stationarity and make the Time Series Stationary
+
+ def valid(self):
+ df = self.data.copy()
+ df = df.loc[:, (df != 0).any(axis=0)]
+ if len(df) != 0 and df.shape[1] == 2:
+ return True
+ else:
+ return False
+
+
+def train(db, cid):
+ """
+ Read the input file(based on cell id received from the main program)
+ call process() to forecast the downlink and uplink of the input cell id
+ Make a VAR model, call the fit method with the desired lag order.
+ """
+ db.read_data(meas='liveCell', cellid=cid)
+ md = PROCESS(db.data)
+ md.process()
+ if md.valid():
+ model = VAR(md.data) # Make a VAR model
+ model_fit = model.fit(10) # call fit method with lag order
+ file_name = 'qp/'+cid.replace('/', '')
+ with open(file_name, 'wb') as f:
+ joblib.dump(model_fit, f) # Save the model with the cell id name