Sample producer consumer to get kafka broker from ICS
[nonrtric.git] / sample-services / ics-producer-consumer / start.sh
1 #  ========================LICENSE_START=================================
2 #  O-RAN-SC
3 #
4 #  Copyright (C) 2024: OpenInfra Foundation Europe
5 #  ========================================================================
6 #  Licensed under the Apache License, Version 2.0 (the "License");
7 #  you may not use this file except in compliance with the License.
8 #  You may obtain a copy of the License at
9 #
10 #       http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #  Unless required by applicable law or agreed to in writing, software
13 #  distributed under the License is distributed on an "AS IS" BASIS,
14 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #  See the License for the specific language governing permissions and
16 #  limitations under the License.
17 #  ============LICENSE_END=================================================
18 #!/bin/bash
19 source utils.sh
20
21 PREFIX="nexus3.o-ran-sc.org:10004"
22 VERSION="0.0.1"
23
24 # Create a network for Kafka Containers
25 docker network create kafka-net
26
27 # Start Kafka
28 docker run -d \
29   --network kafka-net \
30   --name kafka-zkless \
31   -p 9092:9092 \
32   -e LOG_DIR="/tmp/logs" \
33   -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" \
34   -e KAFKA_LISTENERS="PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093" \
35   -e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092" \
36   quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 \
37   /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && \
38   bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && \
39   bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$KAFKA_ADVERTISED_LISTENERS --override listener.security.protocol.map=$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP --override listeners=$KAFKA_LISTENERS'
40
41 # Start ICS
42 docker run -d \
43   --network kafka-net \
44   --name informationcoordinatorservice \
45   -p 8083:8083 \
46   -v ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml \
47   nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0
48
49 # Start Producer
50 docker run -d \
51   --network kafka-net \
52   --name kafka-producer \
53   -p 8080:8080 \
54   -e KAFKA_SERVERS=kafka-zkless:9092 \
55   $PREFIX/o-ran-sc/nonrtric-sample-icsproducer:$VERSION
56
57 #Start Consumer
58 docker run -d \
59   --network kafka-net \
60   --name kafka-consumer \
61   -p 8081:8081 \
62   -e KAFKA_SERVERS=kafka-zkless:9092 \
63   $PREFIX/o-ran-sc/nonrtric-sample-icsconsumer:$VERSION
64
65 # Wait for the Kafka container to be running
66 wait_for_container "kafka-zkless" "Kafka Server started"
67 wait_for_container "kafka-producer" "Started Application"
68 wait_for_container "kafka-consumer" "Started Application"
69
70 # Once Kafka container is running, start the producers and consumers
71 echo "Kafka container is up and running. Starting producer and consumer..."
72 space
73
74 echo "Sending type1 to ICS"
75 curl -X 'PUT' \
76   'http://localhost:8083/data-producer/v1/info-types/type1' \
77   -H 'accept: application/json' \
78   -H 'Content-Type: application/json' \
79   -d '{
80   "info_job_data_schema": {
81     "$schema":"http://json-schema.org/draft-07/schema#",
82     "title":"STD_Type1_1.0.0",
83     "description":"Type 1",
84     "topic": "mytopic",
85     "bootStrapServers": "http://kafka-zkless:9092",
86     "numberOfMessages": 0
87     }
88 }'
89
90 echo "Getting types from ICS"
91 curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1'
92 space
93
94 echo "Sending Producer infos to ICS"
95 curl -X 'PUT' \
96   'http://localhost:8083/data-producer/v1/info-producers/1' \
97   -H 'accept: application/json' \
98   -H 'Content-Type: application/json' \
99   -d '{
100   "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision",
101   "supported_info_types": [
102     "type1"
103   ],
104   "info_job_callback_url": "http://kafka-producer:8080/producer/job"
105 }'
106
107 echo "Getting Producers Infos from ICS"
108 curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1'
109 space
110
111 echo "Sending Consumer Job infos to ICS"
112 curl -X 'PUT' \
113   'http://localhost:8083/data-consumer/v1/info-jobs/1' \
114   -H 'accept: application/json' \
115   -H 'Content-Type: application/json' \
116   -d '{
117   "info_type_id": "type1",
118   "job_owner": "demo",
119   "job_definition": {
120     "deliveryInfo": {
121       "topic": "mytopic",
122       "bootStrapServers": "http://kafka-zkless:9092",
123       "numberOfMessages": 0
124     }
125   },
126   "job_result_uri": "http://kafka-producer:8080/producer/job",
127   "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
128 }'
129
130 echo "Getting Consumer Job Infos from ICS"
131 curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1'
132 space
133
134 echo "Sending Consumer Subscription Job infos to ICS"
135 curl -X 'PUT' \
136   'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \
137   -H 'accept: application/json' \
138   -H 'Content-Type: application/json' \
139   -d '{
140   "status_result_uri": "http://kafka-consumer:8081/consumer/info-type-status",
141   "owner": "demo"
142 }'
143 echo "Getting Consumer Subscription Job infos from ICS"
144 curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
145 space
146
147 #TEST To set kafka broker in the consumer
148 curl -X 'POST' \
149   'http://localhost:8081/consumer/job/1' \
150   -H 'accept: application/json' \
151   -H 'Content-Type: application/json' \
152   -d '{
153   "info_type_id": "type1",
154   "job_owner": "demo",
155   "job_definition": {
156     "deliveryInfo": {
157       "topic": "mytopic",
158       "bootStrapServers": "http://kafka-zkless:9092",
159       "numberOfMessages": 0
160     }
161   },
162   "job_result_uri": "http://kafka-producer:8080/producer/job",
163   "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
164 }'
165
166 #Using the autostart flag in the application.yaml
167 echo "Start 1 Producer on mytopic"
168 curl -X GET http://localhost:8080/startProducer/mytopic
169 space
170
171 echo "Start 1 Consumer on mytopic"
172 #curl -X GET http://localhost:8081/startConsumer/mytopic
173 space
174
175 sleep 10
176
177 echo "ICS Producer Docker logs "
178 docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
179 space
180 echo "Demo Producer Docker logs "
181 docker logs kafka-producer | grep c.d.p.p.SimpleProducer
182 space
183 echo "Demo Consumer Docker logs "
184 docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer
185 space
186
187 echo "Done."
188
189 containers=("kafka-producer" "kafka-consumer")
190
191 for container in "${containers[@]}"; do
192   if docker logs "$container" | grep -q ERROR; then
193     echo "Errors found in logs of $container"
194     docker logs "$container" | grep ERROR
195     echo "FAIL"
196     exit 1
197   else
198     echo "No errors found in logs of $container"
199   fi
200 done
201 echo "SUCCESS"
202 exit 0