return producer
# Helper: Publishes (to) the target broker and the topic in synch
-def publish(kafka_evet, req_id):
+def publish(kafka_evet, req_id, targettopic):
# Instantiate KafkaProducer with keyword arguments
producer = create_kafka_producer()
# synch-publish
# KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
- fut_rec_metadata = producer.send('kafkatopicres', kafka_evet, req_id)
+ fut_rec_metadata = producer.send(targettopic, kafka_evet, req_id)
return fut_rec_metadata.get()
except Exception as err:
try:
requestid = sys.argv[1]
+ targettopic = sys.argv[2]
# response_data_JSON is str
- future = publish(response_data_JSON, requestid)
+ future = publish(response_data_JSON, requestid, targettopic)
if (future is not None):
print (0)