-import tempfile
-import os
-from rmr.rmr_mocks import rmr_mocks
-from a1 import app
-from a1 import exceptions
-from rmr import rmr
-import testing_helpers
-import pytest
-
-# http://flask.pocoo.org/docs/1.0/testing/
-@pytest.fixture
-def client():
- db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
- app.app.config["TESTING"] = True
- cl = app.app.test_client()
-
- yield cl
-
- os.close(db_fd)
- os.unlink(app.app.config["DATABASE"])
-
-
-def _fake_dequeue(
- monkeypatch,
- msg_payload={"status": "SUCCESS", "foo": "bar"},
- msg_type=20001,
- msg_state=0,
- jsonb=True,
- unexpected_first=True,
-):
- """
- generates a mock rmr message response (returns a function that does; uses closures to set params)
- """
- new_messages = []
- # stick a message we don't want at the front of the queue, then stick the message we want
- if unexpected_first:
- monkeypatch.setattr("rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator(msg_payload, -1, msg_state, jsonb))
- sbuf = rmr.rmr_alloc_msg(None, None)
- sbuf = rmr.rmr_torcv_msg(None, sbuf, None)
- summary = rmr.message_summary(sbuf)
- new_messages.append(summary)
-
- monkeypatch.setattr("rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator(msg_payload, msg_type, msg_state, jsonb))
- sbuf = rmr.rmr_alloc_msg(None, None)
- sbuf = rmr.rmr_torcv_msg(None, sbuf, None)
- summary = rmr.message_summary(sbuf)
- new_messages.append(summary)
-
- def f():
- return new_messages
-
- return f
+import time
+import json
+from ricxappframe.rmr.rmr_mocks import rmr_mocks
+from ricxappframe.xapp_sdl import SDLWrapper
+from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
+from a1 import a1rmr, data
+
+RCV_ID = "test_receiver"
+ADM_CRTL_TID = 6660666
+ADM_CTRL_IID = "admission_control_policy"
+ADM_CTRL_POLICIES = "/a1-p/policytypes/{0}/policies".format(ADM_CRTL_TID)
+ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID
+ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
+ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID)
+ACK_MT = 20011
+
+
+def _fake_dequeue():
+ """for monkeypatching with a good status"""
+ pay = json.dumps(
+ {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"}
+ ).encode()
+ fake_msg = {"payload": pay, "message type": ACK_MT}
+ return [(fake_msg, None)]
+
+
+def _fake_dequeue_none():
+ """for monkeypatching with no waiting messages"""
+ return []
+
+
+def _fake_dequeue_deleted():
+ """for monkeypatching with a DELETED status"""
+ new_msgs = []
+ good_pay = json.dumps(
+ {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
+ ).encode()
+
+ # non existent type id
+ pay = json.dumps(
+ {"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
+ ).encode()
+ fake_msg = {"payload": pay, "message type": ACK_MT}
+ new_msgs.append((fake_msg, None))
+
+ # bad instance id
+ pay = json.dumps(
+ {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"}
+ ).encode()
+ fake_msg = {"payload": pay, "message type": ACK_MT}
+ new_msgs.append((fake_msg, None))
+
+ # good body but bad message type
+ fake_msg = {"payload": good_pay, "message type": ACK_MT * 3}
+ new_msgs.append((fake_msg, None))
+
+ # insert a bad one with a malformed body to make sure we keep going
+ new_msgs.append(({"payload": "asdf", "message type": ACK_MT}, None))
+
+ # not even a json
+ new_msgs.append(("asdf", None))
+
+ # good
+ fake_msg = {"payload": good_pay, "message type": ACK_MT}
+ new_msgs.append((fake_msg, None))
+
+ return new_msgs