MDC Dynamic log level changes for a1mediator
[ric-plt/a1.git] / a1 / a1rmr.py
index 050674c..e4a7448 100644 (file)
@@ -21,13 +21,15 @@ import os
 import queue
 import time
 import json
 import queue
 import time
 import json
+import requests
 from threading import Thread
 from ricxappframe.rmr import rmr, helpers
 from mdclogpy import Logger
 from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 from threading import Thread
 from ricxappframe.rmr import rmr, helpers
 from mdclogpy import Logger
 from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
-mdc_logger = Logger(name=__name__)
+mdc_logger = Logger()
+mdc_logger.mdclog_format_init(configmap_monitor=True)
 
 
 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
 
 
 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
@@ -39,6 +41,14 @@ RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
+A1_EI_QUERY_ALL = 20013
+AI_EI_QUERY_ALL_RESP = 20014
+A1_EI_CREATE_JOB = 20015
+A1_EI_CREATE_JOB_RESP = 20016
+A1_EI_DATA_DELIVERY = 20017
+ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083")
+ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes"
+ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/"
 
 
 # Note; yes, globals are bad, but this is a private (to this module) global
 
 
 # Note; yes, globals are bad, but this is a private (to this module) global
@@ -75,6 +85,8 @@ class _RmrLoop:
 
         # see docs/overview#resiliency for a discussion of this
         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
         # see docs/overview#resiliency for a discussion of this
         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+        # queue for data delivery item
+        self.ei_job_result_queue = queue.Queue()
 
         # intialize rmr context
         if init_func_override:
 
         # intialize rmr context
         if init_func_override:
@@ -92,7 +104,7 @@ class _RmrLoop:
         self.rcv_func = (
             rcv_func_override
             if rcv_func_override
         self.rcv_func = (
             rcv_func_override
             if rcv_func_override
-            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
         )
 
         # start the work loop
         )
 
         # start the work loop
@@ -155,6 +167,18 @@ class _RmrLoop:
             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
 
             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
 
+        # now send all the ei-job related data
+        while not self.ei_job_result_queue.empty():
+            mdc_logger.debug("perform data delivery to consumer")
+
+            work_item = self.ei_job_result_queue.get(block=False, timeout=None)
+            payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
+            ei_job_id = int(work_item[0])
+            mdc_logger.debug("data-delivery: {}".format(payload))
+
+            # send the payload to consumer subscribed for ei_job_id
+            self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -210,6 +234,43 @@ class _RmrLoop:
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
 
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
 
+                elif mtype == A1_EI_QUERY_ALL:
+                    mdc_logger.debug("Received messaage {0}".format(msg))
+
+                    # query A1-EI co-ordinator service to get the EI-types
+                    resp = requests.get(ESC_EI_TYPE_PATH)
+                    if resp.status_code != 200:
+                        mdc_logger.warning("Received no reponse from A1-EI service")
+
+                    mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
+
+                    # send the complete list of EI-types to xApp
+                    sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
+
+                elif mtype == A1_EI_CREATE_JOB:
+                    mdc_logger.debug("Received message {0}".format(msg))
+                    payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
+                    mdc_logger.debug("Payload: {0}".format(payload))
+
+                    uuidStr = payload["job-id"]
+                    del payload["job-id"]
+
+                    mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
+
+                    # 1. send request to A1-EI Service to create A1-EI JOB
+                    headers = {'Content-type': 'application/json'}
+                    r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
+                    if (r.status_code != 201) and (r.status_code != 200):
+                        mdc_logger.warning("failed to create EIJOB : {0}".format(r))
+                    else:
+                        # 2. inform xApp for Job status
+                        mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
+                        rmr_data = """{{
+                                "ei_job_id": "{id}"
+                                }}""".format(id=uuidStr)
+                        mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
+                        sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
+
                 else:
                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
 
                 else:
                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
 
@@ -259,6 +320,14 @@ def queue_instance_send(item):
     __RMR_LOOP__.instance_send_queue.put(item)
 
 
     __RMR_LOOP__.instance_send_queue.put(item)
 
 
+def queue_ei_job_result(item):
+    """
+    push an item into the ei_job_queue
+    """
+    mdc_logger.debug("queuing data delivery item {0}".format(item))
+    __RMR_LOOP__.ei_job_result_queue.put(item)
+
+
 def healthcheck_rmr_thread(seconds=30):
     """
     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
 def healthcheck_rmr_thread(seconds=30):
     """
     returns a boolean representing whether the rmr loop is healthy, by checking two attributes: