Sample producer consumer to get kafka broker from ICS 91/12891/3 master
authorlapentafd <francesco.lapenta@est.tech>
Thu, 9 May 2024 03:50:54 +0000 (04:50 +0100)
committerFrancesco Davide Lapenta <francesco.lapenta@est.tech>
Thu, 9 May 2024 10:12:07 +0000 (10:12 +0000)
Issue-ID: NONRTRIC-965
Change-Id: I3f3686afaf5d7cc5193142a5b69ac558ea1d5e90
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
sample-services/ics-producer-consumer/.gitignore
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java
sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml
sample-services/ics-producer-consumer/start.sh

index 851f236..48d83d4 100644 (file)
@@ -21,6 +21,7 @@ logs/
 *.zip
 *.tar.gz
 *.rar
 *.zip
 *.tar.gz
 *.rar
+*.tgz
 
 # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
 
 # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
index 7dc2b1e..d7d9f98 100644 (file)
@@ -33,26 +33,41 @@ import com.demo.producer.producer.SimpleProducer;
 @Component
 public class PropertiesHelper {
     private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
 @Component
 public class PropertiesHelper {
     private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+    private static String kafkaServers = null;
 
     public static Properties getProperties() throws Exception {
 
     public static Properties getProperties() throws Exception {
-        Properties props = null;
+        Properties props = new Properties();
         try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
         try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
-            props = new Properties();
             if (input == null) {
             if (input == null) {
-                log.error("Found no configuration file in resources");
-                throw new Exception("Sorry, unable to find config.properties");
+                log.error("Failed to load configuration file 'config.properties'");
+                throw new IOException("Configuration file 'config.properties' not found");
             }
             props.load(input);
             }
             props.load(input);
-            String kafkaServers = System.getenv("KAFKA_SERVERS");
-            if (kafkaServers != null) {
+            setBootstrapServers(props);
+        } catch (IOException e) {
+            log.error("Error reading configuration file: ", e);
+            throw e;
+        }
+        return props;
+    }
+
+    private static void setBootstrapServers(Properties props) {
+        if (kafkaServers != null && !kafkaServers.isEmpty()) {
+            props.setProperty("bootstrap.servers", kafkaServers);
+            log.info("Using actively bootstrap servers: {}", kafkaServers);
+        } else {
+            String kafkaServersEnv = System.getenv("KAFKA_SERVERS");
+            if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) {
+                kafkaServers = kafkaServersEnv;
                 props.setProperty("bootstrap.servers", kafkaServers);
                 props.setProperty("bootstrap.servers", kafkaServers);
-                log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+                log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers);
             } else {
             } else {
-                log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+                log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file");
             }
             }
-        } catch (IOException e) {
-            log.error("Error reading configuration file: ", e.getMessage());
         }
         }
-        return props;
+    }
+
+    public static void setKafkaServers(String servers) {
+        kafkaServers = servers;
     }
 }
     }
 }
index f9cffd8..e929e44 100644 (file)
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import com.demo.producer.messages.PropertiesHelper;
 import com.demo.producer.repository.Job.Parameters;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.demo.producer.repository.Job.Parameters;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -55,9 +56,20 @@ public class Jobs {
 
     public void addJob(String id, InfoType type, String owner, Parameters parameters) {
         Job job = new Job(id, type, owner, parameters);
 
     public void addJob(String id, InfoType type, String owner, Parameters parameters) {
         Job job = new Job(id, type, owner, parameters);
+        setKafkaServersEnvironment(job);
         this.put(job);
     }
 
         this.put(job);
     }
 
+    private void setKafkaServersEnvironment(Job job) {
+        String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers();
+        if (kafkaServers != null && !kafkaServers.isEmpty()) {
+            PropertiesHelper.setKafkaServers(kafkaServers);
+            logger.info("Setting variable bootStrapServers: {}", kafkaServers);
+        } else {
+            logger.warn("bootStrapServers is not set for job: {}", job.getId());
+        }
+    }
+
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
index 31966df..28ae26c 100644 (file)
@@ -16,7 +16,7 @@
 #
 vars:
   time: 1000
 #
 vars:
   time: 1000
-  autostart: true
+  autostart: false
   topic: mytopic #This topic is used only in autostart
 
 spring:
   topic: mytopic #This topic is used only in autostart
 
 spring:
index 94da48e..e05883e 100755 (executable)
@@ -71,17 +71,6 @@ wait_for_container "kafka-consumer" "Started Application"
 echo "Kafka container is up and running. Starting producer and consumer..."
 space
 
 echo "Kafka container is up and running. Starting producer and consumer..."
 space
 
-#Using the autostart flag in the application.yaml
-echo "Start 1 Producer on mytopic"
-#curl -X GET http://localhost:8080/startProducer/mytopic
-space
-
-echo "Start 1 Consumer on mytopic"
-#curl -X GET http://localhost:8081/startConsumer/mytopic
-space
-
-sleep 10
-
 echo "Sending type1 to ICS"
 curl -X 'PUT' \
   'http://localhost:8083/data-producer/v1/info-types/type1' \
 echo "Sending type1 to ICS"
 curl -X 'PUT' \
   'http://localhost:8083/data-producer/v1/info-types/type1' \
@@ -92,8 +81,10 @@ curl -X 'PUT' \
     "$schema":"http://json-schema.org/draft-07/schema#",
     "title":"STD_Type1_1.0.0",
     "description":"Type 1",
     "$schema":"http://json-schema.org/draft-07/schema#",
     "title":"STD_Type1_1.0.0",
     "description":"Type 1",
-    "type":"object"
-  }
+    "topic": "mytopic",
+    "bootStrapServers": "http://kafka-zkless:9092",
+    "numberOfMessages": 0
+    }
 }'
 
 echo "Getting types from ICS"
 }'
 
 echo "Getting types from ICS"
@@ -146,14 +137,43 @@ curl -X 'PUT' \
   -H 'accept: application/json' \
   -H 'Content-Type: application/json' \
   -d '{
   -H 'accept: application/json' \
   -H 'Content-Type: application/json' \
   -d '{
-  "status_result_uri": "http://kafka-consumer:8081/info-type-status",
-  "owner": "owner"
+  "status_result_uri": "http://kafka-consumer:8081/consumer/info-type-status",
+  "owner": "demo"
 }'
 echo "Getting Consumer Subscription Job infos from ICS"
 curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
 space
 
 }'
 echo "Getting Consumer Subscription Job infos from ICS"
 curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
 space
 
-sleep 5
+#TEST To set kafka broker in the consumer
+curl -X 'POST' \
+  'http://localhost:8081/consumer/job/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_type_id": "type1",
+  "job_owner": "demo",
+  "job_definition": {
+    "deliveryInfo": {
+      "topic": "mytopic",
+      "bootStrapServers": "http://kafka-zkless:9092",
+      "numberOfMessages": 0
+    }
+  },
+  "job_result_uri": "http://kafka-producer:8080/producer/job",
+  "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+#Using the autostart flag in the application.yaml
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+#curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
 echo "ICS Producer Docker logs "
 docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
 space
 echo "ICS Producer Docker logs "
 docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
 space