1 # ============LICENSE_START===============================================
2 # Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 # ========================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=================================================
18 # This script publishes a response-event to a kafka bus
19 # In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
20 # To instal kafka-python please use: pip install kafka-python
21 # Example of an response-event json
23 #"response-code": "400",
24 #"error-info": "Bad format"
32 from kafka import KafkaProducer
34 # Response string with JSON format
35 response_data_JSON = """
42 # Instantiate KafkaProducer with keyword arguments
43 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
44 def create_kafka_producer():
46 producer = KafkaProducer(
47 bootstrap_servers = ['localhost:9092'],
48 key_serializer = str.encode,
49 value_serializer = lambda m: json.dumps(m).encode('ascii'),
53 # Helper: Publishes (to) the target broker and the topic in synch
54 def publish(kafka_evet, req_id, targettopic):
56 # Instantiate KafkaProducer with keyword arguments
57 producer = create_kafka_producer()
58 # Assigns an id to each request that is supposed to get a result
59 # req_id = 'Hll1EsycKLNRric7'
64 # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
65 fut_rec_metadata = producer.send(targettopic, kafka_evet, req_id)
66 return fut_rec_metadata.get()
68 except Exception as err:
69 print('Error while publish', err)
73 if __name__ == '__main__':
76 requestid = sys.argv[1]
77 targettopic = sys.argv[2]
78 # response_data_JSON is str
79 future = publish(response_data_JSON, requestid, targettopic)
81 if (future is not None):