From: subhash kumar singh Date: Tue, 25 May 2021 06:59:23 +0000 (+0000) Subject: A1 Mediator enhancements for A1-EI X-Git-Tag: 2.5.0~6 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=8b663678b683be4f695bb47509b8cbbc2ec6da30;p=ric-plt%2Fa1.git A1 Mediator enhancements for A1-EI Implementation to support A1-EI interface defined at nonRT RIC (ECS service). This process include three steps : * query A1-EI types * create A1-EI Job * handle data delivered from A1-EI producer Issue-ID: RIC-129 Change-Id: I552569b01e8d31c056b336bafe787f7c380aace5 Signed-off-by: subhash kumar singh --- diff --git a/Dockerfile b/Dockerfile index e160d30..8fc5855 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,6 +46,7 @@ USER a1user # Speed hack; we install gevent before anything because when building repeatedly (eg during dev) # and only changing a1 code, we do not need to keep compiling gevent which takes forever RUN pip install --user gevent +RUN pip install --user requests COPY setup.py /home/a1user/ COPY a1/ /home/a1user/a1 diff --git a/a1/a1rmr.py b/a1/a1rmr.py index 050674c..5a300e8 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -21,6 +21,7 @@ import os import queue import time import json +import requests from threading import Thread from ricxappframe.rmr import rmr, helpers from mdclogpy import Logger @@ -39,6 +40,14 @@ RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4)) A1_POLICY_REQUEST = 20010 A1_POLICY_RESPONSE = 20011 A1_POLICY_QUERY = 20012 +A1_EI_QUERY_ALL = 20013 +AI_EI_QUERY_ALL_RESP = 20014 +A1_EI_CREATE_JOB = 20015 +A1_EI_CREATE_JOB_RESP = 20016 +A1_EI_DATA_DELIVERY = 20017 +ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083") +ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes" +ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/" # Note; yes, globals are bad, but this is a private (to this module) global @@ -75,6 +84,8 @@ class _RmrLoop: # see docs/overview#resiliency for a discussion of this self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html + # queue for data delivery item + self.ei_job_result_queue = queue.Queue() # intialize rmr context if init_func_override: @@ -92,7 +103,7 @@ class _RmrLoop: self.rcv_func = ( rcv_func_override if rcv_func_override - else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY]) + else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB]) ) # start the work loop @@ -155,6 +166,18 @@ class _RmrLoop: payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8") self._send_msg(payload, A1_POLICY_REQUEST, work_item[1]) + # now send all the ei-job related data + while not self.ei_job_result_queue.empty(): + mdc_logger.debug("perform data delivery to consumer") + + work_item = self.ei_job_result_queue.get(block=False, timeout=None) + payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8") + ei_job_id = int(payload.get("ei_job_id")) + mdc_logger.debug("data-delivery: {}".format(payload)) + + # send the payload to consumer subscribed for ei_job_id + self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id) + def loop(self): """ This loop runs forever, and has 3 jobs: @@ -210,6 +233,43 @@ class _RmrLoop: except (KeyError, TypeError, json.decoder.JSONDecodeError): mdc_logger.warning("Dropping malformed policy query: {0}".format(msg)) + elif mtype == A1_EI_QUERY_ALL: + mdc_logger.debug("Received messaage {0}".format(msg)) + + # query A1-EI co-ordinator service to get the EI-types + resp = requests.get(ESC_EI_TYPE_PATH) + if resp.status_code != 200: + mdc_logger.warning("Received no reponse from A1-EI service") + + mdc_logger.debug("response from A1-EI service : {0}".format(resp.json())) + + # send the complete list of EI-types to xApp + sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP) + + elif mtype == A1_EI_CREATE_JOB: + mdc_logger.debug("Received message {0}".format(msg)) + payload = json.loads(msg[rmr.RMR_MS_PAYLOAD]) + mdc_logger.debug("Payload: {0}".format(payload)) + + uuidStr = payload["job-id"] + del payload["job-id"] + + mdc_logger.debug("Payload after removing job-id: {0}".format(payload)) + + # 1. send request to A1-EI Service to create A1-EI JOB + headers = {'Content-type': 'application/json'} + r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers) + if (r.status_code != 201) and (r.status_code != 200): + mdc_logger.warning("failed to create EIJOB : {0}".format(r)) + else: + # 2. inform xApp for Job status + mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr)) + rmr_data = """{{ + "ei_job_id": "{id}" + }}""".format(id=uuidStr) + mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data)) + sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP) + else: mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype)) @@ -259,6 +319,15 @@ def queue_instance_send(item): __RMR_LOOP__.instance_send_queue.put(item) +def queue_ei_job_result(item): + """ + push an item into the ei_job_queue + """ + mdc_logger.debug("before queue {0}".format(item)) + __RMR_LOOP__.ei_job_result_queue.put(item) + mdc_logger.debug("after queue") + + def healthcheck_rmr_thread(seconds=30): """ returns a boolean representing whether the rmr loop is healthy, by checking two attributes: diff --git a/a1/controller.py b/a1/controller.py index e00946a..24c9e8e 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -201,3 +201,21 @@ def delete_policy_instance(policy_type_id, policy_instance_id): return "", 202 return _try_func_return(delete_instance_handler) + + +# data delivery + + +def data_delivery(): + """ + Handle data delivery /data-delivery + """ + + def data_delivery_handler(): + mdc_logger.debug("data: {}".format(connexion.request.json)) + ei_job_result_json = connexion.request.json + mdc_logger.debug("jobid: {}".format(ei_job_result_json.get("job"))) + a1rmr.queue_ei_job_result((ei_job_result_json.get("job"), ei_job_result_json)) + return "", 200 + + return _try_func_return(data_delivery_handler) diff --git a/a1/messages.py b/a1/messages.py index fc22bcb..3663053 100644 --- a/a1/messages.py +++ b/a1/messages.py @@ -29,3 +29,13 @@ def a1_to_handler(operation, policy_type_id, policy_instance_id, payload=None): "policy_instance_id": policy_instance_id, "payload": payload, } + + +def ei_to_handler(ei_job_id, payload=None): + """ + used to create the payloads that get sent to downstream policy handlers + """ + return { + "ei_job_id": ei_job_id, + "payload": payload, + } diff --git a/a1/openapi.yaml b/a1/openapi.yaml index 2aa7a94..9c90599 100644 --- a/a1/openapi.yaml +++ b/a1/openapi.yaml @@ -309,6 +309,30 @@ paths: '503': description: "Potentially transient backend database error. Client should attempt to retry later." + '/data-delivery': + + post: + description: > + Deliver data produced by data producer. + tags: + - A1 EI Data Delivery + operationId: a1.controller.data_delivery + requestBody: + content: + application/json: + schema: + type: object + description: > + object to represent data object + responses: + '200': + description: > + successfully delivered data from data producer + + '404': + description: > + no job id defined for this data delivery + components: schemas: policy_type_schema: