1 # ==================================================================================
2 # Copyright (c) 2020 HCL Technologies Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
19 from ricxappframe.xapp_frame import Xapp
21 from ad_model.tb_format import parse
22 from ad_model.ad_model import HDB_PREDICT
24 from ad_train import train
29 If RF model is not present in the path, run train() to train the model for the prediction.
30 Calls predict function for every 1 second(for now as we are using simulated data).
32 if not os.path.isfile('ad/RF'):
34 schedule.every(1).seconds.do(predict, self)
36 schedule.run_pending()
41 Read the input csv file that has both normal and anomalous data.
42 Simulate diff UEIDs that participate in the anomaly by randomly selecting records from this scoring data set
43 Send the UEID and timestamp for the anomalous entries to the Traffic Steering (rmr with the message type as 30003)
44 Get the acknowledgement message from the traffic steering.
46 val = predict_anomaly(self)
51 def predict_anomaly(self):
52 # The read_csv logic will be modified when we are going to fetch the data from database via sdl api.
53 # Read the input csv file
54 ue_data = pd.read_csv('ad/ue_test.csv')
56 # Parse the ue data and predict the anomaly records for the randomly selected UEID
58 db_df = HDB_PREDICT(data)
59 db_df = db_df.loc[db_df['Anomaly'] == 1][['UEID', 'MeasTimestampRF']].head(1)
60 db_df['MeasTimestampRF'] = db_df['MeasTimestampRF'].apply(lambda x: str(x)) # converts into string format
62 # rmr send 30003(TS_ANOMALY_UPDATE), should trigger registered callback
63 result = json.loads(db_df.to_json(orient='records'))
64 val = json.dumps(result).encode()
68 def msg_to_ts(self, val):
69 # send message from ad to ts
70 print("rmr send value:", val)
71 self.rmr_send(val, 30003)
73 # rmr receive to get the acknowledgement message from the traffic steering.
74 for (summary, sbuf) in self.rmr_get_messages():
75 print("TS_ANOMALY_ACK: {}".format(summary))
80 # Initiates xapp api and runs the entry() using xapp.run()
81 xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=True)