+topic_write=""
+topic_read=""
+generic_topics_upload_baseurl=""
+
+uploader_thread=None
+downloader_thread=None
+generic_uploader_thread=None
+
+# Function to upload A1PMS messages to dmaap
+def dmaap_uploader():
+ global msg_requests
+ global cntr_msg_requests_fetched
+
+ print("Starting uploader")
+
+ headers = {'Content-type': 'application/json', 'Accept': '*/*'}
+ #url="http://"+topic_host+"/events/"+topic_read
+ url=topic_read
+
+ while True:
+ while (len(msg_requests)>0):
+ msg=msg_requests[0]
+ if msg is not None:
+ try:
+ print("Sending to dmaap : "+ url)
+ print("Sending to dmaap : "+ msg)
+ resp=requests.post(url, data=msg, headers=headers, timeout=10)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + str(resp.status_code))
+ sleep(1)
+ else:
+ print("Dmaap response code: " + str(resp.status_code))
+ print("Dmaap response text: " + str(resp.text))
+ with lock:
+ msg_requests.pop(0)
+ cntr_msg_requests_fetched += 1
+ except Exception as e:
+ print("Failed, exception: "+ str(e))
+ sleep(1)
+ sleep(0.01)
+
+
+# Function to download A1PMS messages from dmaap
+def dmaap_downloader():
+ global msg_responses
+ global cntr_msg_responses_submitted
+
+ print("Starting uploader")
+
+ while True:
+
+ try :
+ #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
+ url=topic_write
+ headers = {'Accept': 'application/json'}
+ print("Reading from dmaap: " + url)
+ resp=requests.get(url, headers=headers)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + resp.status_code)
+ sleep(1)
+ else:
+ print("Recieved data from dmaap mr")
+ try:
+ data=resp.json()
+ print("Recieved data (raw): " + str(resp.text))
+ if isinstance(data, list):
+ for item in data:
+ item=json.loads(item)
+ corrid=str(item["correlationId"])
+ status=str(item["status"])
+ msg=str(item["message"])
+ item_str=msg+status[0:3]
+ with lock:
+ msg_responses[corrid]=item_str
+ cntr_msg_responses_submitted += 1
+ else:
+ print("Data from dmaap is not json array: " + str(resp.text))
+ sleep(1)
+ except Exception as e:
+ print("Corrupt data from dmaap mr - dropping " + str(data))
+ print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
+ sleep(1)
+ except Exception as e:
+ sleep(1)
+
+# Function to upload generic messages to dmaap
+def dmaap_generic_uploader():
+ global msg_requests
+ global cntr_msg_requests_fetched
+
+ print("Starting generic uploader")
+
+ headers_json = {'Content-type': 'application/json', 'Accept': '*/*'}
+ headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'}
+
+ while True:
+ if (len(generic_messages)):
+ keys_copy = list(generic_messages.keys())
+ for topicname in keys_copy: #topicname contains the path of the topics, eg. "/event/<topic>"
+ topic_queue=generic_messages[topicname]
+ if (len(topic_queue)>0):
+ if (topicname.endswith(".text")):
+ msg=topic_queue[0]
+ headers=headers_text
+ else:
+ msg=topic_queue[0]
+ msg=json.dumps(msg)
+ headers=headers_json
+ url=generic_topics_upload_baseurl+topicname
+ print("Sending to dmaap : "+ url)
+ print("Sending to dmaap : "+ msg)
+ print("Sending to dmaap : "+ str(headers))
+ try:
+ resp=requests.post(url, data=msg, headers=headers, timeout=10)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + str(resp.status_code))
+ sleep(1)
+ else:
+ print("Dmaap response code: " + str(resp.status_code))
+ print("Dmaap response text: " + str(resp.text))
+ with lock:
+ topic_queue.pop(0)
+ cntr_msg_requests_fetched += 1
+ except Exception as e:
+ print("Failed, exception: "+ str(e))
+ sleep(1)
+ sleep(0.01)
+