+topic_write=""
+topic_read=""
+
+uploader_thread=None
+downloader_thread=None
+
+# Function to download messages from dmaap
+def dmaap_uploader():
+ global msg_requests
+ global cntr_msg_requests_fetched
+
+ print("Starting uploader")
+
+ headers = {'Content-type': 'application/json', 'Accept': '*/*'}
+ #url="http://"+topic_host+"/events/"+topic_read
+ url=topic_read
+
+ while True:
+ while (len(msg_requests)>0):
+ msg=msg_requests[0]
+ if msg is not None:
+ try:
+ print("Sending to dmaap : "+ url)
+ print("Sending to dmaap : "+ msg)
+ resp=requests.post(url, data=msg, headers=headers, timeout=10)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + str(resp.status_code))
+ sleep(1)
+ else:
+ print("Dmaap response code: " + str(resp.status_code))
+ print("Dmaap response text: " + str(resp.text))
+ with lock:
+ msg_requests.pop(0)
+ cntr_msg_requests_fetched += 1
+ except Exception as e:
+ print("Failed, exception: "+ str(e))
+ sleep(1)
+ sleep(0.01)
+
+
+# Function to download messages from dmaap
+def dmaap_downloader():
+ global msg_responses
+ global cntr_msg_responses_submitted
+
+ print("Starting uploader")
+
+ while True:
+
+ try :
+ #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
+ url=topic_write
+ headers = {'Accept': 'application/json'}
+ print("Reading from dmaap: " + url)
+ resp=requests.get(url, headers=headers)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + resp.status_code)
+ sleep(1)
+ else:
+ print("Recieved data from dmaap mr")
+ try:
+ data=resp.json()
+ print("Recieved data (raw): " + str(resp.text))
+ if isinstance(data, list):
+ for item in data:
+ item=json.loads(item)
+ corrid=str(item["correlationId"])
+ status=str(item["status"])
+ msg=str(item["message"])
+ item_str=msg+status[0:3]
+ with lock:
+ msg_responses[corrid]=item_str
+ cntr_msg_responses_submitted += 1
+ else:
+ print("Data from dmaap is not json array: " + str(resp.text))
+ sleep(1)
+ except Exception as e:
+ print("Corrupt data from dmaap mr - dropping " + str(data))
+ print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
+ sleep(1)
+ except Exception as e:
+ sleep(1)
+