A1 Mediator enhancements for A1-EI 33/6133/5
authorsubhash kumar singh <subh.singh@samsung.com>
Tue, 25 May 2021 06:59:23 +0000 (06:59 +0000)
committersubhash kumar singh <subh.singh@samsung.com>
Tue, 25 May 2021 10:05:39 +0000 (15:35 +0530)
Implementation to support A1-EI interface defined at nonRT RIC (ECS
service). This process include three steps :

* query A1-EI types
* create A1-EI Job
* handle data delivered from A1-EI producer

Issue-ID: RIC-129
Change-Id: I552569b01e8d31c056b336bafe787f7c380aace5
Signed-off-by: subhash kumar singh <subh.singh@samsung.com>
Dockerfile
a1/a1rmr.py
a1/controller.py
a1/messages.py
a1/openapi.yaml

index e160d30..8fc5855 100644 (file)
@@ -46,6 +46,7 @@ USER a1user
 # Speed hack; we install gevent before anything because when building repeatedly (eg during dev)
 # and only changing a1 code, we do not need to keep compiling gevent which takes forever
 RUN pip install --user gevent
+RUN pip install --user requests
 
 COPY setup.py /home/a1user/
 COPY a1/ /home/a1user/a1
index 050674c..5a300e8 100644 (file)
@@ -21,6 +21,7 @@ import os
 import queue
 import time
 import json
+import requests
 from threading import Thread
 from ricxappframe.rmr import rmr, helpers
 from mdclogpy import Logger
@@ -39,6 +40,14 @@ RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
+A1_EI_QUERY_ALL = 20013
+AI_EI_QUERY_ALL_RESP = 20014
+A1_EI_CREATE_JOB = 20015
+A1_EI_CREATE_JOB_RESP = 20016
+A1_EI_DATA_DELIVERY = 20017
+ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083")
+ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes"
+ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/"
 
 
 # Note; yes, globals are bad, but this is a private (to this module) global
@@ -75,6 +84,8 @@ class _RmrLoop:
 
         # see docs/overview#resiliency for a discussion of this
         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+        # queue for data delivery item
+        self.ei_job_result_queue = queue.Queue()
 
         # intialize rmr context
         if init_func_override:
@@ -92,7 +103,7 @@ class _RmrLoop:
         self.rcv_func = (
             rcv_func_override
             if rcv_func_override
-            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
         )
 
         # start the work loop
@@ -155,6 +166,18 @@ class _RmrLoop:
             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
 
+        # now send all the ei-job related data
+        while not self.ei_job_result_queue.empty():
+            mdc_logger.debug("perform data delivery to consumer")
+
+            work_item = self.ei_job_result_queue.get(block=False, timeout=None)
+            payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
+            ei_job_id = int(payload.get("ei_job_id"))
+            mdc_logger.debug("data-delivery: {}".format(payload))
+
+            # send the payload to consumer subscribed for ei_job_id
+            self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -210,6 +233,43 @@ class _RmrLoop:
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
 
+                elif mtype == A1_EI_QUERY_ALL:
+                    mdc_logger.debug("Received messaage {0}".format(msg))
+
+                    # query A1-EI co-ordinator service to get the EI-types
+                    resp = requests.get(ESC_EI_TYPE_PATH)
+                    if resp.status_code != 200:
+                        mdc_logger.warning("Received no reponse from A1-EI service")
+
+                    mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
+
+                    # send the complete list of EI-types to xApp
+                    sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
+
+                elif mtype == A1_EI_CREATE_JOB:
+                    mdc_logger.debug("Received message {0}".format(msg))
+                    payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
+                    mdc_logger.debug("Payload: {0}".format(payload))
+
+                    uuidStr = payload["job-id"]
+                    del payload["job-id"]
+
+                    mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
+
+                    # 1. send request to A1-EI Service to create A1-EI JOB
+                    headers = {'Content-type': 'application/json'}
+                    r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
+                    if (r.status_code != 201) and (r.status_code != 200):
+                        mdc_logger.warning("failed to create EIJOB : {0}".format(r))
+                    else:
+                        # 2. inform xApp for Job status
+                        mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
+                        rmr_data = """{{
+                                "ei_job_id": "{id}"
+                                }}""".format(id=uuidStr)
+                        mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
+                        sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
+
                 else:
                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
 
@@ -259,6 +319,15 @@ def queue_instance_send(item):
     __RMR_LOOP__.instance_send_queue.put(item)
 
 
+def queue_ei_job_result(item):
+    """
+    push an item into the ei_job_queue
+    """
+    mdc_logger.debug("before queue {0}".format(item))
+    __RMR_LOOP__.ei_job_result_queue.put(item)
+    mdc_logger.debug("after queue")
+
+
 def healthcheck_rmr_thread(seconds=30):
     """
     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
index e00946a..24c9e8e 100644 (file)
@@ -201,3 +201,21 @@ def delete_policy_instance(policy_type_id, policy_instance_id):
         return "", 202
 
     return _try_func_return(delete_instance_handler)
+
+
+# data delivery
+
+
+def data_delivery():
+    """
+    Handle data delivery /data-delivery
+    """
+
+    def data_delivery_handler():
+        mdc_logger.debug("data: {}".format(connexion.request.json))
+        ei_job_result_json = connexion.request.json
+        mdc_logger.debug("jobid: {}".format(ei_job_result_json.get("job")))
+        a1rmr.queue_ei_job_result((ei_job_result_json.get("job"), ei_job_result_json))
+        return "", 200
+
+    return _try_func_return(data_delivery_handler)
index fc22bcb..3663053 100644 (file)
@@ -29,3 +29,13 @@ def a1_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
         "policy_instance_id": policy_instance_id,
         "payload": payload,
     }
+
+
+def ei_to_handler(ei_job_id, payload=None):
+    """
+    used to create the payloads that get sent to downstream policy handlers
+    """
+    return {
+        "ei_job_id": ei_job_id,
+        "payload": payload,
+    }
index 2aa7a94..9c90599 100644 (file)
@@ -309,6 +309,30 @@ paths:
         '503':
           description: "Potentially transient backend database error. Client should attempt to retry later."
 
+  '/data-delivery':
+
+    post:
+      description: >
+        Deliver data produced by data producer.
+      tags:
+        - A1 EI Data Delivery
+      operationId: a1.controller.data_delivery
+      requestBody:
+        content:
+          application/json:
+            schema:
+              type: object
+              description: >
+                object to represent data object
+      responses:
+        '200':
+          description: >
+            successfully delivered data from data producer
+
+        '404':
+          description: >
+            no job id defined for this data delivery
+
 components:
   schemas:
     policy_type_schema: