Sample producer consumer to get kafka broker from ICS 91/12891/3 master
authorlapentafd <francesco.lapenta@est.tech>
Thu, 9 May 2024 03:50:54 +0000 (04:50 +0100)
committerFrancesco Davide Lapenta <francesco.lapenta@est.tech>
Thu, 9 May 2024 10:12:07 +0000 (10:12 +0000)
Issue-ID: NONRTRIC-965
Change-Id: I3f3686afaf5d7cc5193142a5b69ac558ea1d5e90
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
.github/workflows/gerrit-merge.yaml [moved from .github/workflows/gerrit-novote-merge.yaml with 98% similarity]
sample-services/hello-world-sme-invoker/src/main/java/org/oransc/nonrtric/sample/rest/HelloWorldSmeInvokerComponent.java
sample-services/ics-producer-consumer/.gitignore
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java
sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml
sample-services/ics-producer-consumer/start.sh
sample-services/ics-producer-consumer/utils.sh

similarity index 98%
rename from .github/workflows/gerrit-novote-merge.yaml
rename to .github/workflows/gerrit-merge.yaml
index 995b8bd..01cd32b 100644 (file)
@@ -66,7 +66,6 @@ jobs:
           gerrit-change-number: ${{ inputs.GERRIT_CHANGE_NUMBER }}
           gerrit-patchset-number: ${{ inputs.GERRIT_PATCHSET_NUMBER }}
           vote-type: clear
-          comment-only: true
       - name: Allow replication
         run: sleep 10s
 
@@ -119,4 +118,3 @@ jobs:
           gerrit-change-number: ${{ inputs.GERRIT_CHANGE_NUMBER }}
           gerrit-patchset-number: ${{ inputs.GERRIT_PATCHSET_NUMBER }}
           vote-type: ${{ env.WORKFLOW_CONCLUSION }}
-          comment-only: true
index 7d76b4f..7902a3b 100644 (file)
@@ -60,24 +60,22 @@ public class HelloWorldSmeInvokerComponent {
                 throw new CapifAccessException("Unexpected error");\r
             }\r
 \r
-            //TODO The below should be uncommented once SME Manager provides an accessible URI\r
-\r
-//            String helloWorldEndpoint = "";\r
-//            List<String> apiSetEndpoints = getApiSetEndpoints(apiResponse, baseUrl);\r
-//            if (apiSetEndpoints != null && !apiSetEndpoints.isEmpty()) {\r
-//                helloWorldEndpoint = apiSetEndpoints.get(0);\r
-//            }\r
-//\r
-//            if (helloWorldEndpoint != null && !helloWorldEndpoint.isEmpty()) {\r
-//                try {\r
-//                    String responseHelloWorld = restTemplate.getForObject(helloWorldEndpoint, String.class);\r
-//                    logger.info("Response :- ", responseHelloWorld);\r
-//                } catch (IllegalArgumentException e) {\r
-//                    throw new CapifAccessException("Error accessing the URL :- " + helloWorldEndpoint);\r
-//                } catch (Exception e) {\r
-//                    throw new CapifAccessException("Unexpected error");\r
-//                }\r
-//            }\r
+            String helloWorldEndpoint = "";\r
+            List<String> apiSetEndpoints = getApiSetEndpoints(apiResponse, baseUrl);\r
+            if (apiSetEndpoints != null && !apiSetEndpoints.isEmpty()) {\r
+                helloWorldEndpoint = apiSetEndpoints.get(0);\r
+            }\r
+\r
+            if (helloWorldEndpoint != null && !helloWorldEndpoint.isEmpty()) {\r
+                try {\r
+                    String responseHelloWorld = restTemplate.getForObject(helloWorldEndpoint, String.class);\r
+                    logger.info("rApp SME Provider Response : {}", responseHelloWorld);\r
+                } catch (IllegalArgumentException e) {\r
+                    throw new CapifAccessException("Error accessing the URL :- " + helloWorldEndpoint);\r
+                } catch (Exception e) {\r
+                    throw new CapifAccessException("Unexpected error");\r
+                }\r
+            }\r
         }\r
     }\r
 \r
index 851f236..48d83d4 100644 (file)
@@ -21,6 +21,7 @@ logs/
 *.zip
 *.tar.gz
 *.rar
+*.tgz
 
 # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
index 430ab59..1d91afb 100644 (file)
@@ -72,7 +72,6 @@ public class SimpleConsumer extends AbstractSimpleKafka {
         // make the consumer available for graceful shutdown
         setKafkaConsumer(consumer);
         consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
-        //consumer.seekToBeginning(consumer.assignment()); //--from-beginning
         int recNum = numOfRecs;
         while (recNum > 0) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
index 5bc6821..bbf91b9 100644 (file)
@@ -47,14 +47,11 @@ import com.google.gson.GsonBuilder;
 @RequestMapping(path = "/producer", produces = "application/json")
 public class ProducerController {
     private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
-
     private static Gson gson = new GsonBuilder().create();
-
     private final Jobs jobs;
     private final InfoTypes types;
     private String topicName = "mytopic";
 
-
     public ProducerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
         this.jobs = jobs;
         this.types = types;
index 7dc2b1e..d7d9f98 100644 (file)
@@ -33,26 +33,41 @@ import com.demo.producer.producer.SimpleProducer;
 @Component
 public class PropertiesHelper {
     private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+    private static String kafkaServers = null;
 
     public static Properties getProperties() throws Exception {
-        Properties props = null;
+        Properties props = new Properties();
         try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
-            props = new Properties();
             if (input == null) {
-                log.error("Found no configuration file in resources");
-                throw new Exception("Sorry, unable to find config.properties");
+                log.error("Failed to load configuration file 'config.properties'");
+                throw new IOException("Configuration file 'config.properties' not found");
             }
             props.load(input);
-            String kafkaServers = System.getenv("KAFKA_SERVERS");
-            if (kafkaServers != null) {
+            setBootstrapServers(props);
+        } catch (IOException e) {
+            log.error("Error reading configuration file: ", e);
+            throw e;
+        }
+        return props;
+    }
+
+    private static void setBootstrapServers(Properties props) {
+        if (kafkaServers != null && !kafkaServers.isEmpty()) {
+            props.setProperty("bootstrap.servers", kafkaServers);
+            log.info("Using actively bootstrap servers: {}", kafkaServers);
+        } else {
+            String kafkaServersEnv = System.getenv("KAFKA_SERVERS");
+            if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) {
+                kafkaServers = kafkaServersEnv;
                 props.setProperty("bootstrap.servers", kafkaServers);
-                log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+                log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers);
             } else {
-                log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+                log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file");
             }
-        } catch (IOException e) {
-            log.error("Error reading configuration file: ", e.getMessage());
         }
-        return props;
+    }
+
+    public static void setKafkaServers(String servers) {
+        kafkaServers = servers;
     }
 }
index f9cffd8..e929e44 100644 (file)
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import com.demo.producer.messages.PropertiesHelper;
 import com.demo.producer.repository.Job.Parameters;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -55,9 +56,20 @@ public class Jobs {
 
     public void addJob(String id, InfoType type, String owner, Parameters parameters) {
         Job job = new Job(id, type, owner, parameters);
+        setKafkaServersEnvironment(job);
         this.put(job);
     }
 
+    private void setKafkaServersEnvironment(Job job) {
+        String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers();
+        if (kafkaServers != null && !kafkaServers.isEmpty()) {
+            PropertiesHelper.setKafkaServers(kafkaServers);
+            logger.info("Setting variable bootStrapServers: {}", kafkaServers);
+        } else {
+            logger.warn("bootStrapServers is not set for job: {}", job.getId());
+        }
+    }
+
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
index 18317a0..e05883e 100755 (executable)
@@ -71,16 +71,6 @@ wait_for_container "kafka-consumer" "Started Application"
 echo "Kafka container is up and running. Starting producer and consumer..."
 space
 
-echo "Start 1 Producer on mytopic"
-curl -X GET http://localhost:8080/startProducer/mytopic
-space
-
-echo "Start 1 Consumer on mytopic"
-curl -X GET http://localhost:8081/startConsumer/mytopic
-space
-
-sleep 10
-
 echo "Sending type1 to ICS"
 curl -X 'PUT' \
   'http://localhost:8083/data-producer/v1/info-types/type1' \
@@ -91,8 +81,10 @@ curl -X 'PUT' \
     "$schema":"http://json-schema.org/draft-07/schema#",
     "title":"STD_Type1_1.0.0",
     "description":"Type 1",
-    "type":"object"
-  }
+    "topic": "mytopic",
+    "bootStrapServers": "http://kafka-zkless:9092",
+    "numberOfMessages": 0
+    }
 }'
 
 echo "Getting types from ICS"
@@ -145,14 +137,43 @@ curl -X 'PUT' \
   -H 'accept: application/json' \
   -H 'Content-Type: application/json' \
   -d '{
-  "status_result_uri": "http://kafka-consumer:8081/info-type-status",
-  "owner": "owner"
+  "status_result_uri": "http://kafka-consumer:8081/consumer/info-type-status",
+  "owner": "demo"
 }'
 echo "Getting Consumer Subscription Job infos from ICS"
 curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
 space
 
-sleep 5
+#TEST To set kafka broker in the consumer
+curl -X 'POST' \
+  'http://localhost:8081/consumer/job/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_type_id": "type1",
+  "job_owner": "demo",
+  "job_definition": {
+    "deliveryInfo": {
+      "topic": "mytopic",
+      "bootStrapServers": "http://kafka-zkless:9092",
+      "numberOfMessages": 0
+    }
+  },
+  "job_result_uri": "http://kafka-producer:8080/producer/job",
+  "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+#Using the autostart flag in the application.yaml
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+#curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
 echo "ICS Producer Docker logs "
 docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
 space
@@ -170,6 +191,7 @@ containers=("kafka-producer" "kafka-consumer")
 for container in "${containers[@]}"; do
   if docker logs "$container" | grep -q ERROR; then
     echo "Errors found in logs of $container"
+    docker logs "$container" | grep ERROR
     echo "FAIL"
     exit 1
   else
index 68d19ec..af1cfa0 100644 (file)
@@ -69,28 +69,45 @@ checkDockerCompose() {
     fi
 }
 
-# Function to wait for a Docker container to be running and log a specific string
+# Function to wait for a Docker container to be running and log a specific string with a maximum timeout of 20 minutes
 wait_for_container() {
     local container_name="$1"
     local log_string="$2"
+    local timeout=1200  # Timeout set to 20 minutes (20 minutes * 60 seconds)
+
+    local start_time=$(date +%s)
+    local end_time=$((start_time + timeout))
 
     while ! docker inspect "$container_name" &>/dev/null; do
         echo "Waiting for container '$container_name' to be created..."
         sleep 5
+        if [ "$(date +%s)" -ge "$end_time" ]; then
+            echo "Timeout: Container creation exceeded 20 minutes."
+            exit 1
+        fi
     done
 
     while [ "$(docker inspect -f '{{.State.Status}}' "$container_name")" != "running" ]; do
         echo "Waiting for container '$container_name' to be running..."
         sleep 5
+        if [ "$(date +%s)" -ge "$end_time" ]; then
+            echo "Timeout: Container start exceeded 20 minutes."
+            exit 1
+        fi
     done
 
     # Check container logs for the specified string
     while ! docker logs "$container_name" 2>&1 | grep "$log_string"; do
         echo "Waiting for '$log_string' in container logs of '$container_name'..."
         sleep 5
+        if [ "$(date +%s)" -ge "$end_time" ]; then
+            echo "Timeout: Log string not found within 20 minutes."
+            exit 1
+        fi
     done
 }
 
+
 space() {
     echo ""
     echo "++++++++++++++++++++++++++++++++++++++++++++++++++++"