bc9e4b50fc975d3e0cf11bde7820c60f4a95a78d
[nonrtric.git] / test / usecases / linkfailure / simulators / sdnc_simulator.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2021 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from flask import Response
21 import json
22 import random
23 import requests
24 import threading
25
26 # Provides an endpoint for the "UNLOCK" configuration change for an O-DU.
27 # Stores the ID of the O-DU and randomly, after between 0 and 10 seconds, sends an Alarm Notification that clears the
28 # "CUS Link Failure" alarm event to MR.
29 app = Flask(__name__)
30
31 # Server info
32 HOST_IP = "::"
33 HOST_PORT = 9990
34 APP_URL = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=O-RAN-DU-01/yang-ext:mount/"
35
36 linkFailureMessage = {
37     "event": {
38         "commonEventHeader": {
39             "domain": "fault",
40             "eventId": "nt:network-topology/nt:topology/nt:node/nt:node-id",
41             "eventName": "fault_O-RAN-RU-Fault_Alarms_CUS_Link_Failure",
42             "eventType": "O-RAN-RU-Fault_Alarms",
43             "sequence": 0,
44             "priority": "High",
45             "reportingEntityId": "uro1",
46             "reportingEntityName": "@controllerName@",
47             "sourceId": "",
48             "sourceName": "nt:network-topology/nt:topology/nt:node/nt:node-id",
49             "startEpochMicrosec": "@timestamp@",
50             "lastEpochMicrosec": "@timestamp@",
51             "nfNamingCode": "",
52             "nfVendorName": "ietf-hardware (RFC8348) /hardware/component[not(parent)][1]/mfg-name",
53             "timeZoneOffset": "+00:00",
54             "version": "4.1",
55             "vesEventListenerVersion": "7.2.1"
56         },
57         "faultFields": {
58             "faultFieldsVersion": "4.0",
59             "alarmCondition": "o-ran-fm:alarm-notif/fault-id",
60             "alarmInterfaceA": "o-ran-fm:alarm-notif/fault-source",
61             "eventSourceType": "ietf-hardware (RFC8348) /hardware/component[not(parent)][1]/mfg-model or \"O-RU\"",
62             "specificProblem": "CUS Link Failure",
63             "eventSeverity": "NORMAL",
64             "vfStatus": "Active",
65             "alarmAdditionalInformation": {
66                 "eventTime": "@eventTime@",
67                 "equipType": "@type@",
68                 "vendor": "@vendor@",
69                 "model": "@model@"
70             }
71         }
72     }
73 }
74
75
76 class AlarmClearThread (threading.Thread):
77
78     def __init__(self, sleep_time, o_ru_id):
79         threading.Thread.__init__(self)
80         self.sleep_time = sleep_time
81         self.o_ru_id = o_ru_id
82
83     def run(self):
84         print(f'Sleeping: {self.sleep_time} before clearing O-DU: {self.o_ru_id}')
85         msg_as_json = json.loads(json.dumps(linkFailureMessage))
86         msg_as_json["event"]["commonEventHeader"]["reportingEntityId"] = self.o_ru_id
87         requests.post("http://localhost:3904/events/ALARMS-WRITE", json=msg_as_json);
88
89
90 # I'm alive function
91 @app.route('/',
92     methods=['GET'])
93 def index():
94     return 'OK', 200
95
96
97 @app.route(APP_URL + "o-ran-sc-du-hello-world:network-function/<id>",
98     methods=['POST'])
99 def sendrequest(id):
100     o_du_id = id.split("=")[1]
101     print("Config change for O-DU with ID " + o_du_id)
102     payload = json.loads(json.dumps(request.json))
103     o_ru_id = payload["o-ran-sc-du-hello-world:du-to-ru-connection"][0]["name"]
104     random_time = int(10 * random.random())
105     alarm_clear_thread = AlarmClearThread(random_time, o_ru_id)
106     alarm_clear_thread.start()
107
108     return Response(status=201)
109
110
111 if __name__ == "__main__":
112     app.run(port=HOST_PORT, host=HOST_IP)