--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2021 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+"""
+Handles subscription messages from enbs and gnbs through rmr.
+"""
+
+import json
+from ricxappframe.xapp_frame import RMRXapp, rmr
+from ..utils.constants import Constants
+from ._BaseHandler import _BaseHandler
+from ..manager.SdlManager import SdlManager
+class SubscriptionHandler(_BaseHandler):
+
+ def __init__(self, rmr_xapp: RMRXapp, msgtype):
+ super().__init__(rmr_xapp, msgtype)
+
+
+ def request_handler(self, rmr_xapp, summary, sbuf):
+ """
+ Handles subscription messages.
+
+ Parameters
+ ----------
+ rmr_xapp: rmr Instance Context
+
+ summary: dict (required)
+ buffer content
+
+ sbuf: str (required)
+ length of the message
+ """
+ self._rmr_xapp.rmr_free(sbuf)
+ try:
+ req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
+ self.logger.debug("SubscriptionHandler.resp_handler:: Handler processing request")
+ except (json.decoder.JSONDecodeError, KeyError):
+ self.logger.error("Subscription.resp_handler:: Handler failed to parse request")
+ return
+
+ if self.verifySubscription(req):
+ self.logger.info("SubscriptionHandler.resp_handler:: Handler processed request: {}".format(req))
+ else:
+ self.logger.error("SubscriptionHandler.resp_handler:: Request verification failed: {}".format(req))
+ return
+ self.logger.debug("SubscriptionHandler.resp_handler:: Request verification success: {}".format(req))
+
+
+ def verifySubscription(self, req: dict):
+ for i in ["subscription_id", "message"]:
+ if i not in req:
+ return False
+ return True
+
+
+
+
+
+
+
+
+
+
from os import getenv
from ricxappframe.xapp_frame import RMRXapp, rmr
+
+from .handler.SubscriptionHandler import SubscriptionHandler
+from .manager.MetricManager import MetricManager
+from .manager.SubscriptionManager import SubscriptionManager
from .utils.constants import Constants
from .manager import *
+
from .handler import *
from mdclogpy import Logger
sdl_mgr.sdlGetGnbList()
a1_mgr = A1PolicyManager(rmr_xapp)
a1_mgr.startup()
+ sub_mgr = SubscriptionManager(rmr_xapp)
+ enb_list = sub_mgr.get_enb_list()
+ for enb in enb_list:
+ sub_mgr.send_subscription_request(enb)
+ gnb_list = sub_mgr.get_gnb_list()
+ for gnb in gnb_list:
+ sub_mgr.send_subscription_request(gnb)
+ metric_mgr = MetricManager(rmr_xapp)
+ metric_mgr.send_metric()
def _handle_config_change(self, rmr_xapp, config):
"""
"""
HealthCheckHandler(self._rmr_xapp, Constants.RIC_HEALTH_CHECK_REQ)
A1PolicyHandler(self._rmr_xapp, Constants.A1_POLICY_REQ)
+ SubscriptionHandler(self._rmr_xapp,Constants.SUBSCRIPTION_REQ)
def start(self, thread=False):
"""
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2021 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+from ricxappframe.xapp_frame import RMRXapp
+from ricxappframe.metric import metric
+from ._BaseManager import _BaseManager
+from datetime import datetime
+
+# noinspection PyProtectedMember,PyProtectedMember
+class MetricManager(_BaseManager):
+
+ def __init__(self, rmr_xapp: RMRXapp):
+ super().__init__(rmr_xapp)
+ self.metric_mgr = metric.MetricsManager(self._rmr_xapp._mrc, "system-time", "hw-python")
+
+ def send_metric(self):
+
+ # datetime object containing current date and time
+ now = datetime.now()
+ dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
+ metric_list = [dt_string]
+ self.logger.info("MetricManager:: metric time {}".format(metric_list))
+ self.metric_mgr.send_metrics(metric_list)
+ self.logger.info("MetricManager:: metric sent")
+
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2021 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+"""
+
+"""
+
+import requests
+from ricxappframe.xapp_frame import RMRXapp
+import json
+from ..utils.constants import Constants
+from ._BaseManager import _BaseManager
+from swagger_client import api_client
+
+class SubscriptionManager(_BaseManager):
+
+ __namespace = "e2Manager"
+
+ def __init__(self, rmr_xapp: RMRXapp):
+ super().__init__(rmr_xapp)
+ _client = api_client()
+
+ def get_gnb_list(self):
+ gnblist = self._rmr_xapp.get_list_gnb_ids() # yet to come in library
+ self.logger.info("SubscriptionManager.getGnbList:: Processed request: {}".format(json.dumps(gnblist)))
+ return gnblist
+
+ def get_enb_list(self):
+ enblist = self._rmr_xapp.get_list_enb_ids() # yet to come in library
+ self.logger.info("SubscriptionManager.sdlGetGnbList:: Handler processed request: {}".format(json.dumps(enblist)))
+ return enblist
+
+ def send_subscription_request(self,xnb_id):
+ subscription_request = {"xnb_id": xnb_id, "action_type": Constants.ACTION_TYPE}
+ try:
+ json_object = json.dumps(subscription_request,indent=4)
+ except TypeError:
+ print("Unable to serialize the object")
+ url = Constants.SUBSCRIPTION_PATH.format(Constants.PLT_NAMESPACE,
+ Constants.SUBSCRIPTION_SERVICE,
+ Constants.SUBSCRIPTION_PORT)
+ try:
+ response = requests.post(url , json=json_object)
+ response.raise_for_status()
+ except requests.exceptions.HTTPError as err_h:
+ return "An Http Error occurred:" + repr(err_h)
+ except requests.exceptions.ConnectionError as err_c:
+ return "An Error Connecting to the API occurred:" + repr(err_c)
+ except requests.exceptions.Timeout as err_t:
+ return "A Timeout Error occurred:" + repr(err_t)
+ except requests.exceptions.RequestException as err:
+ return "An Unknown Error occurred" + repr(err)
+
+
+
+
+
+
+
A1_POLICY_REQ = 20010
A1_POLICY_RESP = 20011
RIC_ALARM_UPDATE = 110
+ ACTION_TYPE = "REPORT"
+ SUBSCRIPTION_PATH = "http://service-{}-{}-http:{}"
+ PLT_NAMESPACE = "ricplt"
+ SUBSCRIPTION_SERVICE = "submgr"
+ SUBSCRIPTION_PORT = "3800"
+ SUBSCRIPTION_REQ = 12011