3629ea8376581ad46853838346a08ea87eaf4603
[nonrtric.git] / test / usecases / linkfailure / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2021 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 import argparse
20 import ast
21 import requests
22 import json
23 import time
24
25 SDNR_PATH = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=O-RAN-DU-01/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection="
26
27 UNLOCK_MESSAGE = {
28     "o-ran-sc-du-hello-world:du-to-ru-connection": [
29         {
30             "name":"",
31             "administrative-state":"UNLOCKED"
32         }
33     ]
34 }
35
36
37 def is_message_new_link_failure(message):
38     msg_as_json = json.loads(message)
39     event_headers = msg_as_json["event"]["commonEventHeader"]
40
41     link_failure = False
42     if (event_headers["domain"] == "fault"):
43         fault_fields = msg_as_json["event"]["faultFields"]
44         link_failure = fault_fields["specificProblem"] == "CUS Link Failure" and fault_fields["eventSeverity"] == "CRITICAL"
45
46     return link_failure
47
48
49 def is_message_clear_link_failure(message):
50     msg_as_json = json.loads(message)
51     event_headers = msg_as_json["event"]["commonEventHeader"]
52
53     link_failure_clear = False
54     if (event_headers["domain"] == "fault"):
55         fault_fields = msg_as_json["event"]["faultFields"]
56         link_failure_clear = fault_fields["specificProblem"] == "CUS Link Failure" and fault_fields["eventSeverity"] == "NORMAL"
57
58     return link_failure_clear
59
60
61 def handle_link_failure(message, o_ru_to_o_du_map, sdnr_address):
62     verboseprint("Got a link failure: ")
63     alarm_msg_as_json = json.loads(message)
64     event_headers = alarm_msg_as_json["event"]["commonEventHeader"]
65     o_ru_id = event_headers["reportingEntityId"]
66     verboseprint("O-RU ID: " + o_ru_id)
67     o_du_id = o_ru_to_o_du_map[o_ru_id]
68     verboseprint("O-DU ID: " + o_du_id)
69     unlock_msg = json.loads(json.dumps(UNLOCK_MESSAGE))
70     unlock_msg["o-ran-sc-du-hello-world:du-to-ru-connection"][0]["name"] = o_du_id
71     response = requests.post(sdnr_address + SDNR_PATH + o_du_id, json=unlock_msg)
72     print(response)
73
74
75 def handle_clear_link_failure(message):
76     msg_as_json = json.loads(message)
77     event_headers = msg_as_json["event"]["commonEventHeader"]
78     o_ru_id = event_headers["reportingEntityId"]
79     verboseprint("Cleared Link Failure for O-RU ID: " + o_ru_id)
80
81
82 def read_o_ru_to_o_du_map_from_file(map_file):
83     file = open(map_file, "r")
84     contents = file.read()
85     dictionary = ast.literal_eval(contents)
86     file.close()
87     return dictionary
88
89
90 if __name__ == '__main__':
91     parser = argparse.ArgumentParser(prog='PROG')
92     parser.add_argument('--mrHost', help='The URL of the MR host', default="http://message-router.onap")
93     parser.add_argument('--mrPort', help='The port of the MR host', type=int, default=3904)
94     parser.add_argument('--mrTopic', help='The topic to poll messages from', default="ALARMS-WRITE")
95     parser.add_argument('--sdnrHost', help='The URL of the SNDR host', default="http://localhost")
96     parser.add_argument('--sdnrPort', help='The port of the SDNR host', type=int, default=9990)
97     parser.add_argument('--oRuTooDuMapFile', help='A file with the mapping between O-RU ID and O-DU ID as a dictionary', default="o-ru-to-o-du-map.txt")
98     parser.add_argument('--pollTime', help='The time between polls', type=int, default=10)
99     parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
100     parser.add_argument('--version', action='version', version='%(prog)s 1.0')
101     args = vars(parser.parse_args())
102     mr_host = args["mrHost"]
103     mr_port = args["mrPort"]
104     mr_topic = args["mrTopic"]
105     sdnr_host = args["sdnrHost"]
106     sdnr_port = args["sdnrPort"]
107     o_ru_to_o_du_map = read_o_ru_to_o_du_map_from_file(args["oRuTooDuMapFile"])
108     pollTime = args["pollTime"]
109
110     if args["verbose"]:
111
112         def verboseprint(*args, **kwargs):
113             print(*args, **kwargs)
114
115     else:
116         verboseprint = lambda *a, **k: None  # do-nothing function
117
118     verboseprint("Using MR address: " + mr_host + ":" + str(mr_port) + " and topic: " + mr_topic)
119     verboseprint("Using SDNR address: " + sdnr_host + ":" + str(sdnr_port))
120     verboseprint("Starting with " + str(pollTime) + " seconds between polls")
121     mr_address = mr_host + ":" + str(mr_port) + "/events/" + mr_topic + "/users/test/"
122     sdnr_address = sdnr_host + ":" + str(sdnr_port)
123
124     while True:
125         response = requests.get(mr_address)
126         verboseprint("Polling")
127         messages = response.json()
128         for message in messages:
129             if (is_message_new_link_failure(message)):
130                 handle_link_failure(message, o_ru_to_o_du_map, sdnr_address)
131             elif (is_message_clear_link_failure(message)):
132                 handle_clear_link_failure(message)
133
134         time.sleep(pollTime)