import queue
import time
import json
+import requests
from threading import Thread
from ricxappframe.rmr import rmr, helpers
from mdclogpy import Logger
A1_POLICY_REQUEST = 20010
A1_POLICY_RESPONSE = 20011
A1_POLICY_QUERY = 20012
+A1_EI_QUERY_ALL = 20013
+AI_EI_QUERY_ALL_RESP = 20014
+A1_EI_CREATE_JOB = 20015
+A1_EI_CREATE_JOB_RESP = 20016
+A1_EI_DATA_DELIVERY = 20017
+ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083")
+ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes"
+ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/"
# Note; yes, globals are bad, but this is a private (to this module) global
# see docs/overview#resiliency for a discussion of this
self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
+ # queue for data delivery item
+ self.ei_job_result_queue = queue.Queue()
# intialize rmr context
if init_func_override:
self.rcv_func = (
rcv_func_override
if rcv_func_override
- else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
+ else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
)
# start the work loop
payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+ # now send all the ei-job related data
+ while not self.ei_job_result_queue.empty():
+ mdc_logger.debug("perform data delivery to consumer")
+
+ work_item = self.ei_job_result_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
+ ei_job_id = int(payload.get("ei_job_id"))
+ mdc_logger.debug("data-delivery: {}".format(payload))
+
+ # send the payload to consumer subscribed for ei_job_id
+ self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
+
def loop(self):
"""
This loop runs forever, and has 3 jobs:
except (KeyError, TypeError, json.decoder.JSONDecodeError):
mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
+ elif mtype == A1_EI_QUERY_ALL:
+ mdc_logger.debug("Received messaage {0}".format(msg))
+
+ # query A1-EI co-ordinator service to get the EI-types
+ resp = requests.get(ESC_EI_TYPE_PATH)
+ if resp.status_code != 200:
+ mdc_logger.warning("Received no reponse from A1-EI service")
+
+ mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
+
+ # send the complete list of EI-types to xApp
+ sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
+
+ elif mtype == A1_EI_CREATE_JOB:
+ mdc_logger.debug("Received message {0}".format(msg))
+ payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
+ mdc_logger.debug("Payload: {0}".format(payload))
+
+ uuidStr = payload["job-id"]
+ del payload["job-id"]
+
+ mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
+
+ # 1. send request to A1-EI Service to create A1-EI JOB
+ headers = {'Content-type': 'application/json'}
+ r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
+ if (r.status_code != 201) and (r.status_code != 200):
+ mdc_logger.warning("failed to create EIJOB : {0}".format(r))
+ else:
+ # 2. inform xApp for Job status
+ mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
+ rmr_data = """{{
+ "ei_job_id": "{id}"
+ }}""".format(id=uuidStr)
+ mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
+ sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
+
else:
mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
__RMR_LOOP__.instance_send_queue.put(item)
+def queue_ei_job_result(item):
+ """
+ push an item into the ei_job_queue
+ """
+ mdc_logger.debug("before queue {0}".format(item))
+ __RMR_LOOP__.ei_job_result_queue.put(item)
+ mdc_logger.debug("after queue")
+
+
def healthcheck_rmr_thread(seconds=30):
"""
returns a boolean representing whether the rmr loop is healthy, by checking two attributes: