Add login to O-RU Closed Loop use case
[nonrtric.git] / test / usecases / oruclosedlooprecovery / scriptversion / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2021 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 import argparse
20 import ast
21 import json
22 import os
23 import requests
24 import time
25
26 MR_PATH = "/events/[TOPIC]/users/test/"
27 SDNR_PATH = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=[O-DU-ID]/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=[O-RU-ID]"
28 FAUILT_ID = "28"
29
30 UNLOCK_MESSAGE = {
31     "o-ran-sc-du-hello-world:du-to-ru-connection": [
32         {
33             "name":"",
34             "administrative-state":"UNLOCKED"
35         }
36     ]
37 }
38
39
40 def is_message_new_link_failure(message):
41     msg_as_json = json.loads(message)
42     event_headers = msg_as_json["event"]["commonEventHeader"]
43
44     link_failure = False
45     if (event_headers["domain"] == "fault"):
46         fault_fields = msg_as_json["event"]["faultFields"]
47         link_failure = fault_fields["alarmCondition"] == FAUILT_ID and fault_fields["eventSeverity"] != "NORMAL"
48
49     return link_failure
50
51
52 def is_message_clear_link_failure(message):
53     msg_as_json = json.loads(message)
54     event_headers = msg_as_json["event"]["commonEventHeader"]
55
56     link_failure_clear = False
57     if (event_headers["domain"] == "fault"):
58         fault_fields = msg_as_json["event"]["faultFields"]
59         link_failure_clear = fault_fields["alarmCondition"] == FAUILT_ID and fault_fields["eventSeverity"] == "NORMAL"
60
61     return link_failure_clear
62
63
64 def handle_link_failure(message, o_ru_to_o_du_map, sdnr_address, sdnr_user, sdnr_pwd):
65     verboseprint("Got a link failure: ")
66     alarm_msg_as_json = json.loads(message)
67     event_headers = alarm_msg_as_json["event"]["commonEventHeader"]
68     o_ru_id = event_headers["sourceName"]
69     verboseprint("O-RU ID: " + o_ru_id)
70     if o_ru_id in o_ru_to_o_du_map:
71         o_du_id = o_ru_to_o_du_map[o_ru_id]
72         verboseprint("O-DU ID: " + o_du_id)
73         unlock_msg = json.loads(json.dumps(UNLOCK_MESSAGE))
74         unlock_msg["o-ran-sc-du-hello-world:du-to-ru-connection"][0]["name"] = o_ru_id
75         send_path = SDNR_PATH.replace("[O-DU-ID]", o_du_id).replace("[O-RU-ID]", o_ru_id)
76         requests.put(sdnr_address + send_path, auth=(sdnr_user, sdnr_pwd), json=unlock_msg)
77     else:
78         print("ERROR: No mapping for O-RU ID: " + o_ru_id)
79
80
81 def handle_clear_link_failure(message):
82     msg_as_json = json.loads(message)
83     event_headers = msg_as_json["event"]["commonEventHeader"]
84     o_ru_id = event_headers["sourceName"]
85     verboseprint("Cleared Link Failure for O-RU ID: " + o_ru_id)
86
87
88 def read_o_ru_to_o_du_map_from_file(map_file):
89     file = open(map_file, "r")
90     contents = file.read()
91     dictionary = ast.literal_eval(contents)
92     file.close()
93     return dictionary
94
95
96 def poll_and_handle_messages(mr_address, sdnr_address, sdnr_user, sdnr_pwd):
97     while True:
98         try:
99             verboseprint("Polling")
100             response = requests.get(mr_address)
101             messages = response.json()
102             for message in messages:
103                 if (is_message_new_link_failure(message)):
104                     handle_link_failure(message, o_ru_to_o_du_map, sdnr_address, sdnr_user, sdnr_pwd)
105                 elif (is_message_clear_link_failure(message)):
106                     handle_clear_link_failure(message)
107         except Exception as inst:
108             print(inst)
109
110         time.sleep(pollTime)
111
112
113 if __name__ == '__main__':
114     parser = argparse.ArgumentParser(prog='PROG')
115     parser.add_argument('--mrHost', help='The URL of the MR host (default: %(default)s)', default="http://message-router.onap")
116     parser.add_argument('--mrPort', help='The port of the MR host (default: %(default)d)', type=int, default=3904)
117     parser.add_argument('--mrTopic', help='The topic to poll messages from (default: %(default)s)', default="unauthenticated.SEC_FAULT_OUTPUT")
118     parser.add_argument('--sdnrHost', help='The URL of the SNDR host (default: %(default)s)', default="http://localhost")
119     parser.add_argument('--sdnrPort', help='The port of the SDNR host (default: %(default)d)', type=int, default=9990)
120     parser.add_argument('--sdnrUser', help='Username for SDNR', default="admin")
121     parser.add_argument('--sdnrPwd', help='Password for SDNR', default="Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U")
122     parser.add_argument('--oRuTooDuMapFile', help='A file with the mapping between O-RU ID and O-DU ID as a dictionary (default: %(default)s)', default="o-ru-to-o-du-map.txt")
123     parser.add_argument('--pollTime', help='The time between polls (default: %(default)d)', type=int, default=10)
124     parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
125     parser.add_argument('--version', action='version', version='%(prog)s 1.0')
126     args = vars(parser.parse_args())
127     mr_host = args["mrHost"]
128     if os.getenv("MR-HOST") is not None:
129         mr_host = os.getenv("MR-HOST")
130         print("Using MR Host from os: " + mr_host)
131     mr_port = args["mrPort"]
132     if os.getenv("MR-PORT") is not None:
133         mr_port = os.getenv("MR-PORT")
134         print("Using MR Port from os: " + mr_port)
135     mr_topic = args["mrTopic"]
136     sdnr_host = args["sdnrHost"]
137     if os.getenv("SDNR-HOST") is not None:
138         sdnr_host = os.getenv("SDNR-HOST")
139         print("Using SNDR Host from os: " + sdnr_host)
140     sdnr_port = args["sdnrPort"]
141     if os.getenv("SDNR-PORT") is not None:
142         sdnr_port = os.getenv("SDNR-PORT")
143         print("Using SNDR Host from os: " + sdnr_port)
144     sdnr_user = args["sdnrUser"]
145     if os.getenv("SDNR-USER") is not None:
146         sdnr_user = os.getenv("SDNR-USER")
147         print("Using SNDR User from os: " + sdnr_user)
148     sdnr_pwd = args["sdnrPwd"]
149     if os.getenv("SDNR-PWD") is not None:
150         sdnr_pwd = os.getenv("SDNR-PWD")
151         print("Using SNDR Password from os: " + sdnr_pwd)
152     o_ru_to_o_du_map = read_o_ru_to_o_du_map_from_file(args["oRuTooDuMapFile"])
153     pollTime = args["pollTime"]
154
155     if os.getenv("VERBOSE") is not None or args["verbose"]:
156
157         def verboseprint(*args, **kwargs):
158             print(*args, **kwargs)
159
160     else:
161         verboseprint = lambda *a, **k: None  # do-nothing function
162
163     verboseprint("Using MR address: " + mr_host + ":" + str(mr_port) + " and topic: " + mr_topic)
164     verboseprint("Using SDNR address: " + sdnr_host + ":" + str(sdnr_port))
165     verboseprint("Starting with " + str(pollTime) + " seconds between polls")
166     mr_address = mr_host + ":" + str(mr_port) + MR_PATH.replace("[TOPIC]", mr_topic)
167     sdnr_address = sdnr_host + ":" + str(sdnr_port)
168
169     poll_and_handle_messages(mr_address, sdnr_address, sdnr_user, sdnr_pwd)