+# Function to upload generic messages to dmaap
+def dmaap_generic_uploader():
+ global msg_requests
+ global cntr_msg_requests_fetched
+
+ print("Starting generic uploader")
+
+ headers_json = {'Content-type': 'application/json', 'Accept': '*/*'}
+ headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'}
+
+ while True:
+ if (len(generic_messages)):
+ keys_copy = list(generic_messages.keys())
+ for topicname in keys_copy: #topicname contains the path of the topics, eg. "/event/<topic>"
+ topic_queue=generic_messages[topicname]
+ if (len(topic_queue)>0):
+ if (topicname.endswith(".text")):
+ msg=topic_queue[0]
+ headers=headers_text
+ else:
+ msg=topic_queue[0]
+ msg=json.dumps(msg)
+ headers=headers_json
+ url=generic_topics_upload_baseurl+topicname
+ print("Sending to dmaap : "+ url)
+ print("Sending to dmaap : "+ msg)
+ print("Sending to dmaap : "+ str(headers))
+ try:
+ resp=requests.post(url, data=msg, headers=headers, timeout=10)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + str(resp.status_code))
+ sleep(1)
+ else:
+ print("Dmaap response code: " + str(resp.status_code))
+ print("Dmaap response text: " + str(resp.text))
+ with lock:
+ topic_queue.pop(0)
+ cntr_msg_requests_fetched += 1
+ except Exception as e:
+ print("Failed, exception: "+ str(e))
+ sleep(1)
+ sleep(0.01)
+