1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
27 mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
30 while rmr.rmr_ready(mrc) == 0:
32 print("not yet ready")
34 print("listening ON {}".format(PORT))
40 pay = {"policy_type_id": test_type}
41 sbuf_send = rmr.rmr_alloc_msg(mrc, 4096, payload=json.dumps(pay).encode("utf-8"), gen_transaction_id=True, mtype=20012)
42 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
43 post_send_summary = rmr.message_summary(sbuf_send)
45 if not (post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK"):
46 print("was unable to send query to a1!")
49 # query worked, wait 2 seconds, then receive everything we have
51 print("reading messages")
53 # this is a hacked up version of rmr_rcvall_msgs in the rmr package
54 # we need the actual messages, not the summaries, to use rts
55 sbuf_rcv = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
57 sbuf_rcv = rmr.rmr_torcv_msg(mrc, sbuf_rcv, 0) # set the timeout to 0 so this doesn't block!!
59 summary = rmr.message_summary(sbuf_rcv)
60 if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states
61 print("no more instances received. will try again in 1s")
64 print("Received: {0}".format(summary))
66 received_payload = json.loads(summary["payload"])
67 assert received_payload["policy_type_id"] == test_type
68 assert summary["message type"] == 20010
71 "policy_type_id": received_payload["policy_type_id"],
72 "policy_instance_id": received_payload["policy_instance_id"],
73 "handler_id": "query_tester",
76 val = json.dumps(payload).encode("utf-8")
77 rmr.set_payload_and_length(val, sbuf_rcv) # TODO: extend rmr-python to allow rts to accept this param
78 sbuf_rcv.contents.mtype = 20011 # TODO: extend rmr-python to allow rts to accept this param
79 print("Pre reply summary: {}".format(rmr.message_summary(sbuf_rcv)))
82 sbuf_rcv = rmr.rmr_rts_msg(mrc, sbuf_rcv)
83 post_reply_summary = rmr.message_summary(sbuf_rcv)
84 print("Post reply summary: {}".format(post_reply_summary))