Update version number in container-tag for F Maintenance Release
[sim/a1-interface.git] / near-rt-ric-simulator / test / common / publish_response_event_to_kafka_bus.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2022 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 # This script publishes a response-event to a kafka bus
19 # In order to use this script, you must have an venv for Python and kafka-python libs has to be installed
20 # To instal kafka-python please use: pip install kafka-python
21 # Example of an response-event json
22 #{
23   #"response-code": "400",
24   #"error-info": "Bad format"
25 #}
26
27
28 import os
29 import json
30 import sys
31
32 from kafka import KafkaProducer
33
34 # Response string with JSON format
35 response_data_JSON =  """
36 {
37   "response-code": 200,
38   "error-info": ""
39 }
40 """
41
42 # Instantiate KafkaProducer with keyword arguments
43 # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
44 def create_kafka_producer():
45
46   producer = KafkaProducer(
47     bootstrap_servers = ['localhost:9092'],
48     key_serializer = str.encode,
49     value_serializer = lambda m: json.dumps(m).encode('ascii'),
50   )
51   return producer
52
53 # Helper: Publishes (to) the target broker and the topic in synch
54 def publish(kafka_evet, req_id, targettopic):
55
56   # Instantiate KafkaProducer with keyword arguments
57   producer = create_kafka_producer()
58   # Assigns an id to each request that is supposed to get a result
59   # req_id  = 'Hll1EsycKLNRric7'
60
61   try:
62
63     # synch-publish
64     # KafkaProducer.send(topicname, value=broker_message, key=req_id, headers=None, partition=None, timestamp_ms=None)
65     fut_rec_metadata = producer.send(targettopic, kafka_evet, req_id)
66     return fut_rec_metadata.get()
67
68   except Exception as err:
69     print('Error while publish', err)
70   finally:
71     producer.close()
72
73 if __name__ == '__main__':
74     try:
75
76         requestid = sys.argv[1]
77         targettopic = sys.argv[2]
78         # response_data_JSON is str
79         future = publish(response_data_JSON, requestid, targettopic)
80
81         if (future is not None):
82             print (0)
83         else:
84             print (1)
85
86     except Exception:
87         print (1)
88     sys.exit()