MR_PATH = "/events/[TOPIC]/users/test/"
SDNR_PATH = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=[O-DU-ID]/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=[O-RU-ID]"
+FAUILT_ID = "28"
UNLOCK_MESSAGE = {
"o-ran-sc-du-hello-world:du-to-ru-connection": [
link_failure = False
if (event_headers["domain"] == "fault"):
fault_fields = msg_as_json["event"]["faultFields"]
- link_failure = fault_fields["alarmCondition"] == "30" and fault_fields["eventSeverity"] != "NORMAL"
+ link_failure = fault_fields["alarmCondition"] == FAUILT_ID and fault_fields["eventSeverity"] != "NORMAL"
return link_failure
link_failure_clear = False
if (event_headers["domain"] == "fault"):
fault_fields = msg_as_json["event"]["faultFields"]
- link_failure_clear = fault_fields["alarmCondition"] == "30" and fault_fields["eventSeverity"] == "NORMAL"
+ link_failure_clear = fault_fields["alarmCondition"] == FAUILT_ID and fault_fields["eventSeverity"] == "NORMAL"
return link_failure_clear
-def handle_link_failure(message, o_ru_to_o_du_map, sdnr_address):
+def handle_link_failure(message, o_ru_to_o_du_map, sdnr_address, sdnr_user, sdnr_pwd):
verboseprint("Got a link failure: ")
alarm_msg_as_json = json.loads(message)
event_headers = alarm_msg_as_json["event"]["commonEventHeader"]
o_ru_id = event_headers["sourceName"]
verboseprint("O-RU ID: " + o_ru_id)
- o_du_id = o_ru_to_o_du_map[o_ru_id]
- verboseprint("O-DU ID: " + o_du_id)
- unlock_msg = json.loads(json.dumps(UNLOCK_MESSAGE))
- unlock_msg["o-ran-sc-du-hello-world:du-to-ru-connection"][0]["name"] = o_ru_id
- send_path = SDNR_PATH.replace("[O-DU-ID]", o_du_id).replace("[O-RU-ID]", o_ru_id)
- requests.post(sdnr_address + send_path, json=unlock_msg)
+ if o_ru_id in o_ru_to_o_du_map:
+ o_du_id = o_ru_to_o_du_map[o_ru_id]
+ verboseprint("O-DU ID: " + o_du_id)
+ unlock_msg = json.loads(json.dumps(UNLOCK_MESSAGE))
+ unlock_msg["o-ran-sc-du-hello-world:du-to-ru-connection"][0]["name"] = o_ru_id
+ send_path = SDNR_PATH.replace("[O-DU-ID]", o_du_id).replace("[O-RU-ID]", o_ru_id)
+ requests.put(sdnr_address + send_path, auth=(sdnr_user, sdnr_pwd), json=unlock_msg)
+ else:
+ print("ERROR: No mapping for O-RU ID: " + o_ru_id)
def handle_clear_link_failure(message):
return dictionary
-def poll_and_handle_messages(mr_address, sdnr_address):
+def poll_and_handle_messages(mr_address, sdnr_address, sdnr_user, sdnr_pwd):
while True:
try:
verboseprint("Polling")
messages = response.json()
for message in messages:
if (is_message_new_link_failure(message)):
- handle_link_failure(message, o_ru_to_o_du_map, sdnr_address)
+ handle_link_failure(message, o_ru_to_o_du_map, sdnr_address, sdnr_user, sdnr_pwd)
elif (is_message_clear_link_failure(message)):
handle_clear_link_failure(message)
except Exception as inst:
parser.add_argument('--mrTopic', help='The topic to poll messages from (default: %(default)s)', default="unauthenticated.SEC_FAULT_OUTPUT")
parser.add_argument('--sdnrHost', help='The URL of the SNDR host (default: %(default)s)', default="http://localhost")
parser.add_argument('--sdnrPort', help='The port of the SDNR host (default: %(default)d)', type=int, default=9990)
+ parser.add_argument('--sdnrUser', help='Username for SDNR', default="admin")
+ parser.add_argument('--sdnrPwd', help='Password for SDNR', default="Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U")
parser.add_argument('--oRuTooDuMapFile', help='A file with the mapping between O-RU ID and O-DU ID as a dictionary (default: %(default)s)', default="o-ru-to-o-du-map.txt")
parser.add_argument('--pollTime', help='The time between polls (default: %(default)d)', type=int, default=10)
parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
if os.getenv("SDNR-PORT") is not None:
sdnr_port = os.getenv("SDNR-PORT")
print("Using SNDR Host from os: " + sdnr_port)
+ sdnr_user = args["sdnrUser"]
+ if os.getenv("SDNR-USER") is not None:
+ sdnr_user = os.getenv("SDNR-USER")
+ print("Using SNDR User from os: " + sdnr_user)
+ sdnr_pwd = args["sdnrPwd"]
+ if os.getenv("SDNR-PWD") is not None:
+ sdnr_pwd = os.getenv("SDNR-PWD")
+ print("Using SNDR Password from os: " + sdnr_pwd)
o_ru_to_o_du_map = read_o_ru_to_o_du_map_from_file(args["oRuTooDuMapFile"])
pollTime = args["pollTime"]
mr_address = mr_host + ":" + str(mr_port) + MR_PATH.replace("[TOPIC]", mr_topic)
sdnr_address = sdnr_host + ":" + str(sdnr_port)
- poll_and_handle_messages(mr_address, sdnr_address)
+ poll_and_handle_messages(mr_address, sdnr_address, sdnr_user, sdnr_pwd)