self.xaction = uuid.uuid1().hex.encode("utf-8")
self.sub_id = 0
self.tp_state = 0
+ self.meid = None
def __str__(self):
return str(
"xaction": self.xaction,
"sub_id": self.sub_id,
"tp_state": self.tp_state,
+ "meid": self.meid,
}
)
Patch rmr; requires a monkeypatch (pytest) object to be passed in
"""
- def fake_alloc(_unused, _alsounused):
- return Rmr_mbuf_t()
+ def fake_alloc(
+ _vctx, _sz, payload=None, gen_transaction_id=False, mtype=None, meid=None, sub_id=None, fixed_transaction_id=None
+ ):
+ sbuf = Rmr_mbuf_t()
+ if payload:
+ sbuf.contents.payload = payload
+
+ if fixed_transaction_id:
+ sbuf.contents.xaction = fixed_transaction_id
+ elif gen_transaction_id:
+ sbuf.contents.xaction = uuid.uuid1().hex.encode("utf-8")
+
+ if mtype:
+ sbuf.contents.mtype = mtype
+
+ if meid:
+ sbuf.contents.meid = meid
+
+ if sub_id:
+ sbuf.contents.sub_id = sub_id
+
+ return sbuf
def fake_set_payload_and_length(payload, sbuf):
sbuf.contents.payload = payload
def fake_get_payload(sbuf):
return sbuf.contents.payload
- def fake_get_meid(_sbuf):
- return None # this is not a part of rmr_mbuf_t
+ def fake_get_meid(sbuf):
+ return sbuf.contents.meid
def fake_get_src(_sbuf):
return "localtest:80" # this is not a part of rmr_mbuf_t