from flask import Flask
from flask import Response
+from flask_httpauth import HTTPBasicAuth
import json
import os
import random
# Stores the ID of the O-DU and randomly, after between 0 and 10 seconds, sends an Alarm Notification that clears the
# "CUS Link Failure" alarm event to MR.
app = Flask(__name__)
+auth = HTTPBasicAuth()
mr_host = "http://localhost"
mr_port = "3904"
# Server info
HOST_IP = "::"
HOST_PORT = 9990
-APP_URL = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=<string:o_du_id>/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=<string:o_ru_id>"
+APP_URL = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=<string:o_du_id>/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions=<string:o_du_id2>/radio-resource-management-policy-ratio=rrm-pol-1"
+
+USERNAME = "admin"
+PASSWORD = "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"
FAULT_ID = "28"
class AlarmClearThread (threading.Thread):
- def __init__(self, sleep_time, o_ru_id):
+ def __init__(self, sleep_time, o_du_id):
threading.Thread.__init__(self)
self.sleep_time = sleep_time
- self.o_ru_id = o_ru_id
+ self.o_du_id = o_du_id
def run(self):
- print(f'Sleeping: {self.sleep_time} before clearing O-DU: {self.o_ru_id}')
+ print(f'Sleeping: {self.sleep_time} before clearing O-DU: {self.o_du_id}')
time.sleep(self.sleep_time)
msg_as_json = json.loads(json.dumps(linkFailureMessage))
- msg_as_json["event"]["commonEventHeader"]["sourceName"] = self.o_ru_id
- print("Sedning alarm clear for O-RU: " + self.o_ru_id)
+ msg_as_json["event"]["commonEventHeader"]["sourceName"] = self.o_du_id
+ print("Sedning alarm clear for O-DU: " + self.o_du_id)
requests.post(mr_host + ":" + mr_port + MR_PATH, json=msg_as_json);
return 'OK', 200
+@auth.verify_password
+def verify_password(username, password):
+ if username == USERNAME and password == PASSWORD:
+ return username
+
+
@app.route(APP_URL,
- methods=['POST'])
-def sendrequest(o_du_id, o_ru_id):
- print("Got request with O-DU ID: " + o_du_id + " and O-RU ID: " + o_ru_id)
+ methods=['PUT'])
+@auth.login_required
+def sendrequest(o_du_id, o_du_id2):
+ print("Got request with O-DU ID: " + o_du_id)
random_time = int(10 * random.random())
- alarm_clear_thread = AlarmClearThread(random_time, o_ru_id)
+ alarm_clear_thread = AlarmClearThread(random_time, o_du_id)
alarm_clear_thread.start()
return Response(status=200)