X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=qp%2Fqptrain.py;fp=qp%2Fqptrain.py;h=68dcb35912ef4608d4d31da6c366cd5260988e20;hb=a1cb1abac5f5979f507b6760f0862c38c3ba2347;hp=0000000000000000000000000000000000000000;hpb=cd32fb9b8f63431086809c542b6dab26c8ea09b1;p=ric-app%2Fqp.git diff --git a/qp/qptrain.py b/qp/qptrain.py new file mode 100644 index 0000000..68dcb35 --- /dev/null +++ b/qp/qptrain.py @@ -0,0 +1,91 @@ +# ================================================================================== +# Copyright (c) 2020 HCL Technologies Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== + +from statsmodels.tsa.api import VAR +from statsmodels.tsa.stattools import adfuller +import joblib + + +class PROCESS(object): + + def __init__(self, data): + self.diff = 0 + self.data = data + + def adfuller_test(self, series, thresh=0.05, verbose=False): + """ADFuller test for Stationarity of given series and return True or False""" + r = adfuller(series, autolag='AIC') + output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags': round(r[2], 4), 'n_obs': r[3]} + p_value = output['pvalue'] + if p_value <= thresh: + return True + else: + return False + + def make_stationary(self): + """ call adfuller_test() to check for stationary + If the column is stationary, perform 1st differencing and return data""" + df = self.data.copy() + res_adf = [] + for name, column in df.iteritems(): + res_adf.append(self.adfuller_test(column)) # Perform ADF test + if not all(res_adf): + self.data = df.diff().dropna() + self.diff += 1 + + def invert_transformation(self, inp, forecast): + """Revert back the differencing to get the forecast to original scale.""" + if self.diff == 0: + return forecast + df = forecast.copy() + columns = inp.columns + for col in columns: + df[col] = inp[col].iloc[-1] + df[col].cumsum() + self.diff = 0 + return df + + def process(self): + """ Filter throughput parameters, call make_stationary() to check for Stationarity time series + """ + df = self.data.copy() + df = df[['pdcpBytesDl', 'pdcpBytesUl']] + self.data = df.loc[:, (df != 0).any(axis=0)] + self.make_stationary() # check for Stationarity and make the Time Series Stationary + + def valid(self): + df = self.data.copy() + df = df.loc[:, (df != 0).any(axis=0)] + if len(df) != 0 and df.shape[1] == 2: + return True + else: + return False + + +def train(db, cid): + """ + Read the input file(based on cell id received from the main program) + call process() to forecast the downlink and uplink of the input cell id + Make a VAR model, call the fit method with the desired lag order. + """ + db.read_data(meas='liveCell', cellid=cid) + md = PROCESS(db.data) + md.process() + if md.valid(): + model = VAR(md.data) # Make a VAR model + model_fit = model.fit(10) # call fit method with lag order + file_name = 'qp/'+cid.replace('/', '') + with open(file_name, 'wb') as f: + joblib.dump(model_fit, f) # Save the model with the cell id name