From: John Keeney Date: Tue, 14 May 2024 16:23:59 +0000 (+0000) Subject: Merge "Sample consumer to get kafka broker from ICS" X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=6e5c5f7ae965639acda093998527448c35c664ac;hp=46f5295920d6e1909dedc056f5169c14a70f528d;p=nonrtric.git Merge "Sample consumer to get kafka broker from ICS" --- diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java index 63cc2157..30225c73 100644 --- a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java @@ -32,8 +32,9 @@ import org.springframework.web.bind.annotation.RestController; import com.demo.consumer.repository.InfoType; import com.demo.consumer.repository.InfoTypes; import com.demo.consumer.repository.Job.Parameters; +import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo; import com.demo.consumer.dme.ConsumerJobInfo; -import com.demo.consumer.dme.ConsumerStatusInfo; +import com.demo.consumer.dme.JobDataSchema; import com.demo.consumer.repository.Jobs; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -68,14 +69,22 @@ public class ConsumerController { this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner, toJobParameters(request.jobDefinition)); } catch (Exception e) { - log.error("Error adding the job" + infoJobId, e.getMessage()); + log.error("Error adding the job " + infoJobId + "{}", e.getMessage()); } } @PostMapping("/info-type-status") public void statusChange(@RequestBody String requestBody) { - ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class); - log.info("Add Status Job Info", request); + JobDataSchema request = gson.fromJson(requestBody, JobDataSchema.class); + log.debug("Body Received: {}" , requestBody); + try { + this.jobs.addJob(request.getInfo_type_id(), types.getType(request.getInfo_type_id()), "", + new Parameters(new KafkaDeliveryInfo( + request.getJob_data_schema().getTopic(), + request.getJob_data_schema().getBootStrapServers(), 0))); + } catch (Exception e) { + log.error("Error adding the info type " + request.getInfo_type_id() + "{}", e.getMessage()); + } } private Parameters toJobParameters(Object jobData) { diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/JobDataSchema.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/JobDataSchema.java new file mode 100644 index 00000000..678f5365 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/JobDataSchema.java @@ -0,0 +1,59 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.dme; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo; +import com.google.gson.Gson; + +@Data +public class JobDataSchema { + + public enum InfoJobStatusValues { + REGISTERED, UNREGISTERED + } + @SerializedName("info_type_id") + private String info_type_id; + @SerializedName("job_data_schema") + private DataSchema job_data_schema; + @SerializedName("status") + private InfoJobStatusValues status; + + @Override + public String toString() { + return new Gson().toJson(this); + } + + @Getter + @Setter + public class DataSchema { + private String title; + private String description; + @SerializedName("topic") + private String topic; + @SerializedName("bootStrapServers") + private String bootStrapServers; + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java index 18be2f8e..c4bd436c 100644 --- a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java @@ -33,26 +33,41 @@ import com.demo.consumer.consumer.SimpleConsumer; @Component public class PropertiesHelper { private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class); + private static String kafkaServers = null; public static Properties getProperties() throws Exception { - Properties props = null; + Properties props = new Properties(); try (InputStream input = SimpleConsumer.class.getClassLoader().getResourceAsStream("config.properties")) { - props = new Properties(); if (input == null) { - log.error("Found no configuration file in resources"); - throw new Exception("Sorry, unable to find config.properties"); + log.error("Failed to load configuration file 'config.properties'"); + throw new IOException("Configuration file 'config.properties' not found"); } props.load(input); - String kafkaServers = System.getenv("KAFKA_SERVERS"); - if (kafkaServers != null) { + setBootstrapServers(props); + } catch (IOException e) { + log.error("Error reading configuration file: ", e); + throw e; + } + return props; + } + + private static void setBootstrapServers(Properties props) { + if (kafkaServers != null && !kafkaServers.isEmpty()) { + props.setProperty("bootstrap.servers", kafkaServers); + log.info("Using actively bootstrap servers: {}", kafkaServers); + } else { + String kafkaServersEnv = System.getenv("KAFKA_SERVERS"); + if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) { + kafkaServers = kafkaServersEnv; props.setProperty("bootstrap.servers", kafkaServers); - log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers); + log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers); } else { - log.info("Env variable KAFKA_SERVERS not found, defaulting to config file"); + log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file"); } - } catch (IOException e) { - log.error("Error reading configuration file: ", e.getMessage()); } - return props; + } + + public static void setKafkaServers(String servers) { + kafkaServers = servers; } } diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java index ba123458..e681714c 100644 --- a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java @@ -22,6 +22,7 @@ package com.demo.consumer.repository; import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -29,9 +30,11 @@ import lombok.ToString; @ToString public class Job { + @AllArgsConstructor @Builder public static class Parameters { + @AllArgsConstructor @Builder @EqualsAndHashCode public static class KafkaDeliveryInfo { diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java index 13f31806..eab80353 100644 --- a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import com.demo.consumer.messages.PropertiesHelper; import com.demo.consumer.repository.Job.Parameters; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -55,9 +56,20 @@ public class Jobs { public void addJob(String id, InfoType type, String owner, Parameters parameters) { Job job = new Job(id, type, owner, parameters); + setKafkaServersEnvironment(job); this.put(job); } + private void setKafkaServersEnvironment(Job job) { + String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers(); + if (kafkaServers != null && !kafkaServers.isEmpty()) { + PropertiesHelper.setKafkaServers(kafkaServers); + logger.info("Setting variable bootStrapServers: {}", kafkaServers); + } else { + logger.warn("bootStrapServers is not set for job: {}", job.getId()); + } + } + private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml index eed83264..1925f5c9 100644 --- a/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml +++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml @@ -19,7 +19,7 @@ server: vars: time: 1000 - autostart: true + autostart: false topic: mytopic #This topic is used only in autostart spring: diff --git a/sample-services/ics-producer-consumer/start.sh b/sample-services/ics-producer-consumer/start.sh index e05883e9..7ddc9080 100755 --- a/sample-services/ics-producer-consumer/start.sh +++ b/sample-services/ics-producer-consumer/start.sh @@ -82,8 +82,7 @@ curl -X 'PUT' \ "title":"STD_Type1_1.0.0", "description":"Type 1", "topic": "mytopic", - "bootStrapServers": "http://kafka-zkless:9092", - "numberOfMessages": 0 + "bootStrapServers": "kafka-zkless:9092" } }' @@ -119,8 +118,8 @@ curl -X 'PUT' \ "job_definition": { "deliveryInfo": { "topic": "mytopic", - "bootStrapServers": "http://kafka-zkless:9092", - "numberOfMessages": 0 + "bootStrapServers": "kafka-zkless:9092", + "numberOfMessages": 100 } }, "job_result_uri": "http://kafka-producer:8080/producer/job", @@ -144,23 +143,20 @@ echo "Getting Consumer Subscription Job infos from ICS" curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json' space -#TEST To set kafka broker in the consumer -curl -X 'POST' \ - 'http://localhost:8081/consumer/job/1' \ +#To Set Kafka Broker in Consumer +echo "Sending type1 to ICS to use the callback" +curl -X 'PUT' \ + 'http://localhost:8083/data-producer/v1/info-types/type1' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "info_type_id": "type1", - "job_owner": "demo", - "job_definition": { - "deliveryInfo": { - "topic": "mytopic", - "bootStrapServers": "http://kafka-zkless:9092", - "numberOfMessages": 0 + "info_job_data_schema": { + "$schema":"http://json-schema.org/draft-07/schema#", + "title":"STD_Type1_1.0.0", + "description":"Type 1", + "topic": "mytopic", + "bootStrapServers": "kafka-zkless:9092" } - }, - "job_result_uri": "http://kafka-producer:8080/producer/job", - "status_notification_uri": "http://kafka-producer:8080/producer/supervision" }' #Using the autostart flag in the application.yaml @@ -169,7 +165,7 @@ curl -X GET http://localhost:8080/startProducer/mytopic space echo "Start 1 Consumer on mytopic" -#curl -X GET http://localhost:8081/startConsumer/mytopic +curl -X GET http://localhost:8081/startConsumer/mytopic space sleep 10