Sample consumer to get kafka broker from ICS 06/12906/1
authorlapentafd <francesco.lapenta@est.tech>
Tue, 14 May 2024 06:55:30 +0000 (07:55 +0100)
committerlapentafd <francesco.lapenta@est.tech>
Tue, 14 May 2024 06:55:37 +0000 (07:55 +0100)
Issue-ID: NONRTRIC-965
Change-Id: Ie69a514edd7f49ec83cd16f41aeec61d74fbf13e
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/JobDataSchema.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java
sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml
sample-services/ics-producer-consumer/start.sh

index 63cc215..30225c7 100644 (file)
@@ -32,8 +32,9 @@ import org.springframework.web.bind.annotation.RestController;
 import com.demo.consumer.repository.InfoType;
 import com.demo.consumer.repository.InfoTypes;
 import com.demo.consumer.repository.Job.Parameters;
+import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo;
 import com.demo.consumer.dme.ConsumerJobInfo;
-import com.demo.consumer.dme.ConsumerStatusInfo;
+import com.demo.consumer.dme.JobDataSchema;
 import com.demo.consumer.repository.Jobs;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -68,14 +69,22 @@ public class ConsumerController {
             this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner,
                     toJobParameters(request.jobDefinition));
         } catch (Exception e) {
-            log.error("Error adding the job" + infoJobId, e.getMessage());
+            log.error("Error adding the job " + infoJobId + "{}", e.getMessage());
         }
     }
 
     @PostMapping("/info-type-status")
     public void statusChange(@RequestBody String requestBody) {
-        ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class);
-        log.info("Add Status Job Info", request);
+        JobDataSchema request = gson.fromJson(requestBody, JobDataSchema.class);
+        log.debug("Body Received: {}" , requestBody);
+        try {
+            this.jobs.addJob(request.getInfo_type_id(), types.getType(request.getInfo_type_id()), "",
+                new Parameters(new KafkaDeliveryInfo(
+                        request.getJob_data_schema().getTopic(),
+                        request.getJob_data_schema().getBootStrapServers(), 0)));
+        } catch (Exception e) {
+            log.error("Error adding the info type " + request.getInfo_type_id() + "{}", e.getMessage());
+        }
     }
 
     private Parameters toJobParameters(Object jobData) {
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/JobDataSchema.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/JobDataSchema.java
new file mode 100644 (file)
index 0000000..678f536
--- /dev/null
@@ -0,0 +1,59 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.dme;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+
+import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo;
+import com.google.gson.Gson;
+
+@Data
+public class JobDataSchema {
+
+    public enum InfoJobStatusValues {
+        REGISTERED, UNREGISTERED
+    }
+    @SerializedName("info_type_id")
+    private String info_type_id;
+    @SerializedName("job_data_schema")
+    private DataSchema job_data_schema;
+    @SerializedName("status")
+    private InfoJobStatusValues status;
+
+    @Override
+    public String toString() {
+        return new Gson().toJson(this);
+    }
+
+    @Getter
+    @Setter
+    public class DataSchema {
+        private String title;
+        private String description;
+        @SerializedName("topic")
+        private String topic;
+        @SerializedName("bootStrapServers")
+        private String bootStrapServers;
+    }
+}
index 18be2f8..c4bd436 100644 (file)
@@ -33,26 +33,41 @@ import com.demo.consumer.consumer.SimpleConsumer;
 @Component
 public class PropertiesHelper {
     private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+    private static String kafkaServers = null;
 
     public static Properties getProperties() throws Exception {
-        Properties props = null;
+        Properties props = new Properties();
         try (InputStream input = SimpleConsumer.class.getClassLoader().getResourceAsStream("config.properties")) {
-            props = new Properties();
             if (input == null) {
-                log.error("Found no configuration file in resources");
-                throw new Exception("Sorry, unable to find config.properties");
+                log.error("Failed to load configuration file 'config.properties'");
+                throw new IOException("Configuration file 'config.properties' not found");
             }
             props.load(input);
-            String kafkaServers = System.getenv("KAFKA_SERVERS");
-            if (kafkaServers != null) {
+            setBootstrapServers(props);
+        } catch (IOException e) {
+            log.error("Error reading configuration file: ", e);
+            throw e;
+        }
+        return props;
+    }
+
+    private static void setBootstrapServers(Properties props) {
+        if (kafkaServers != null && !kafkaServers.isEmpty()) {
+            props.setProperty("bootstrap.servers", kafkaServers);
+            log.info("Using actively bootstrap servers: {}", kafkaServers);
+        } else {
+            String kafkaServersEnv = System.getenv("KAFKA_SERVERS");
+            if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) {
+                kafkaServers = kafkaServersEnv;
                 props.setProperty("bootstrap.servers", kafkaServers);
-                log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+                log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers);
             } else {
-                log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+                log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file");
             }
-        } catch (IOException e) {
-            log.error("Error reading configuration file: ", e.getMessage());
         }
-        return props;
+    }
+
+    public static void setKafkaServers(String servers) {
+        kafkaServers = servers;
     }
 }
index ba12345..e681714 100644 (file)
@@ -22,6 +22,7 @@ package com.demo.consumer.repository;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -29,9 +30,11 @@ import lombok.ToString;
 
 @ToString
 public class Job {
+    @AllArgsConstructor
     @Builder
     public static class Parameters {
 
+        @AllArgsConstructor
         @Builder
         @EqualsAndHashCode
         public static class KafkaDeliveryInfo {
index 13f3180..eab8035 100644 (file)
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import com.demo.consumer.messages.PropertiesHelper;
 import com.demo.consumer.repository.Job.Parameters;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -55,9 +56,20 @@ public class Jobs {
 
     public void addJob(String id, InfoType type, String owner, Parameters parameters) {
         Job job = new Job(id, type, owner, parameters);
+        setKafkaServersEnvironment(job);
         this.put(job);
     }
 
+    private void setKafkaServersEnvironment(Job job) {
+        String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers();
+        if (kafkaServers != null && !kafkaServers.isEmpty()) {
+            PropertiesHelper.setKafkaServers(kafkaServers);
+            logger.info("Setting variable bootStrapServers: {}", kafkaServers);
+        } else {
+            logger.warn("bootStrapServers is not set for job: {}", job.getId());
+        }
+    }
+
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
index e05883e..7ddc908 100755 (executable)
@@ -82,8 +82,7 @@ curl -X 'PUT' \
     "title":"STD_Type1_1.0.0",
     "description":"Type 1",
     "topic": "mytopic",
-    "bootStrapServers": "http://kafka-zkless:9092",
-    "numberOfMessages": 0
+    "bootStrapServers": "kafka-zkless:9092"
     }
 }'
 
@@ -119,8 +118,8 @@ curl -X 'PUT' \
   "job_definition": {
     "deliveryInfo": {
       "topic": "mytopic",
-      "bootStrapServers": "http://kafka-zkless:9092",
-      "numberOfMessages": 0
+      "bootStrapServers": "kafka-zkless:9092",
+      "numberOfMessages": 100
     }
   },
   "job_result_uri": "http://kafka-producer:8080/producer/job",
@@ -144,23 +143,20 @@ echo "Getting Consumer Subscription Job infos from ICS"
 curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
 space
 
-#TEST To set kafka broker in the consumer
-curl -X 'POST' \
-  'http://localhost:8081/consumer/job/1' \
+#To Set Kafka Broker in Consumer
+echo "Sending type1 to ICS to use the callback"
+curl -X 'PUT' \
+  'http://localhost:8083/data-producer/v1/info-types/type1' \
   -H 'accept: application/json' \
   -H 'Content-Type: application/json' \
   -d '{
-  "info_type_id": "type1",
-  "job_owner": "demo",
-  "job_definition": {
-    "deliveryInfo": {
-      "topic": "mytopic",
-      "bootStrapServers": "http://kafka-zkless:9092",
-      "numberOfMessages": 0
+  "info_job_data_schema": {
+    "$schema":"http://json-schema.org/draft-07/schema#",
+    "title":"STD_Type1_1.0.0",
+    "description":"Type 1",
+    "topic": "mytopic",
+    "bootStrapServers": "kafka-zkless:9092"
     }
-  },
-  "job_result_uri": "http://kafka-producer:8080/producer/job",
-  "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
 }'
 
 #Using the autostart flag in the application.yaml
@@ -169,7 +165,7 @@ curl -X GET http://localhost:8080/startProducer/mytopic
 space
 
 echo "Start 1 Consumer on mytopic"
-#curl -X GET http://localhost:8081/startConsumer/mytopic
+curl -X GET http://localhost:8081/startConsumer/mytopic
 space
 
 sleep 10