1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
22 from ricxappframe.xapp_frame import Xapp, rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from mdclogpy import Logger
25 from ad_model import modelling, CAUSE
26 from ad_train import ModelTraining
27 from database import DATABASE, DUMMY
32 sdl = SDLWrapper(use_fake_sdl=True)
34 logger = Logger(name=__name__)
38 """ If ML model is not present in the path, It will trigger training module to train the model.
39 Calls predict function every 10 millisecond(for now as we are using simulated data).
44 schedule.every(0.5).seconds.do(predict, self)
46 schedule.run_pending()
56 logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
60 if not os.path.isfile('src/model'):
61 mt = ModelTraining(db)
66 """Read the latest ue sample from influxDB and detects if that is anomalous or normal..
67 Send the UEID, DUID, Degradation type and timestamp for the anomalous samples to Traffic Steering (rmr with the message type as 30003)
68 Get the acknowledgement of sent message from the traffic steering.
72 if db.data is not None:
73 if set(md.num).issubset(db.data.columns):
74 db.data = db.data.dropna(axis=0)
76 val = predict_anomaly(self, db.data)
78 logger.warning("Parameters does not match with of training data")
80 logger.warning("No data in last 1 second")
82 if (val is not None) and (len(val) > 2):
86 def predict_anomaly(self, df):
87 """ calls ad_predict to detect if given sample is normal or anomalous
88 find out the degradation type if sample is anomalous
89 write given sample along with predicted label to AD measurement
93 ue: array or dataframe
97 val: anomalus sample info(UEID, DUID, TimeStamp, Degradation type)
99 df['Anomaly'] = md.predict(df)
100 df.loc[:, 'Degradation'] = ''
102 if 1 in df.Anomaly.unique():
103 df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
104 df_a = df.loc[df['Anomaly'] == 1].copy()
106 df_a['time'] = df_a.index
107 cols = [db.ue, 'time', 'Degradation']
108 # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
109 result = json.loads(df_a.loc[:, cols].to_json(orient='records'))
110 val = json.dumps(result).encode()
111 df.loc[:, 'RRU.PrbUsedDl'] = df['RRU.PrbUsedDl'].astype('float')
112 df.index = pd.date_range(start=df.index[0], periods=len(df), freq='1ms')
117 def msg_to_ts(self, val):
118 # send message from ad to ts
119 logger.debug("Sending Anomalous UE to TS")
120 success = self.rmr_send(val, 30003)
122 logger.info(" Message to TS: message sent Successfully")
123 # rmr receive to get the acknowledgement message from the traffic steering.
125 for summary, sbuf in self.rmr_get_messages():
126 if sbuf.contents.mtype == 30004:
127 logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
128 if sbuf.contents.mtype == 20010:
129 a1_request_handler(self, summary, sbuf)
133 def connectdb(thread=False):
134 # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
142 success = db.connect()
145 def a1_request_handler(self, summary, sbuf):
146 logger.info("A1 policy received")
148 req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
149 logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
150 except (json.decoder.JSONDecodeError, KeyError):
151 logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
154 if verifyPolicy(req):
155 logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
157 logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
158 logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
159 change_threshold(self, req)
160 resp = buildPolicyResp(self, req)
161 self.rmr_send(json.dumps(resp).encode(), 20011)
162 logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
166 def change_threshold(self, req: dict):
167 if req["operation"] == "CREATE":
168 payload = req["payload"]
169 threshold = json.loads(payload)[db.a1_param]
170 logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
173 def verifyPolicy(req: dict):
174 for i in ["policy_type_id", "operation", "policy_instance_id"]:
180 def buildPolicyResp(self, req: dict):
181 req["handler_id"] = "ad"
188 def start(thread=False):
189 # Initiates xapp api and runs the entry() using xapp.run()
190 xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
191 logger.debug("AD xApp starting")