import json
from threading import Thread
from rmr import rmr, helpers
-from a1 import get_module_logger
+from mdclogpy import Logger
from a1 import data
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
-logger = get_module_logger(__name__)
+mdc_logger = Logger(name=__name__)
-RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
+RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
+
+
+A1_POLICY_REQUEST = 20010
+A1_POLICY_RESPONSE = 20011
+A1_POLICY_QUERY = 20012
+
# Note; yes, globals are bad, but this is a private (to this module) global
# No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
if init_func_override:
self.mrc = init_func_override()
else:
- logger.debug("Waiting for rmr to initialize..")
+ mdc_logger.debug("Waiting for rmr to initialize..")
# rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
# internal ring of messages, and receive calls read from that
# currently the size is 2048 messages, so this is fine for the foreseeable future
time.sleep(0.5)
# set the receive function
- self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024])
+ # TODO: when policy query is implemented, add A1_POLICY_QUERY
+ self.rcv_func = (
+ rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+ )
# start the work loop
self.thread = Thread(target=self.loop)
- clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
"""
# loop forever
- logger.debug("Work loop starting")
+ mdc_logger.debug("Work loop starting")
while self.keep_going:
# send out all messages waiting for us
pay = work_item["payload"].encode("utf-8")
for _ in range(0, RETRY_TIMES):
# Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
- sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"])
+ sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
+ # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
+ sbuf.contents.sub_id = work_item["ptid"]
pre_send_summary = rmr.message_summary(sbuf)
sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
post_send_summary = rmr.message_summary(sbuf)
- logger.debug("Pre-send summary: %s, Post-send summary: %s", pre_send_summary, post_send_summary)
+ mdc_logger.debug(
+ "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
+ )
rmr.rmr_free_msg(sbuf) # free
if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
- logger.debug("Message sent successfully!")
+ mdc_logger.debug("Message sent successfully!")
break
# read our mailbox and update statuses
- updated_instances = set()
for msg in self.rcv_func():
try:
pay = json.loads(msg["payload"])
pti = pay["policy_type_id"]
pii = pay["policy_instance_id"]
- data.set_status(pti, pii, pay["handler_id"], pay["status"])
- updated_instances.add((pti, pii))
- except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
+ data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
+ except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
# TODO: in the future we may also have to catch SDL errors
- logger.debug(("Dropping malformed or non applicable message", msg))
-
- # for all updated instances, see if we can trigger a delete
- # should be no catch needed here, since the status update would have failed if it was a bad pair
- for ut in updated_instances:
- data.clean_up_instance(ut[0], ut[1])
+ mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
# TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
self.last_ran = time.time()