From 3a4561d445fff624a893283f76744dece0fe3acf Mon Sep 17 00:00:00 2001 From: lapentafd Date: Thu, 9 May 2024 04:50:54 +0100 Subject: [PATCH] Sample producer consumer to get kafka broker from ICS Issue-ID: NONRTRIC-965 Change-Id: I3f3686afaf5d7cc5193142a5b69ac558ea1d5e90 Signed-off-by: lapentafd --- sample-services/ics-producer-consumer/.gitignore | 1 + .../demo/producer/messages/PropertiesHelper.java | 37 ++++++++++----- .../java/com/demo/producer/repository/Jobs.java | 12 +++++ .../producer/src/main/resources/application.yaml | 2 +- sample-services/ics-producer-consumer/start.sh | 52 +++++++++++++++------- 5 files changed, 76 insertions(+), 28 deletions(-) diff --git a/sample-services/ics-producer-consumer/.gitignore b/sample-services/ics-producer-consumer/.gitignore index 851f236b..48d83d43 100644 --- a/sample-services/ics-producer-consumer/.gitignore +++ b/sample-services/ics-producer-consumer/.gitignore @@ -21,6 +21,7 @@ logs/ *.zip *.tar.gz *.rar +*.tgz # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java index 7dc2b1ef..d7d9f98d 100644 --- a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java @@ -33,26 +33,41 @@ import com.demo.producer.producer.SimpleProducer; @Component public class PropertiesHelper { private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class); + private static String kafkaServers = null; public static Properties getProperties() throws Exception { - Properties props = null; + Properties props = new Properties(); try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) { - props = new Properties(); if (input == null) { - log.error("Found no configuration file in resources"); - throw new Exception("Sorry, unable to find config.properties"); + log.error("Failed to load configuration file 'config.properties'"); + throw new IOException("Configuration file 'config.properties' not found"); } props.load(input); - String kafkaServers = System.getenv("KAFKA_SERVERS"); - if (kafkaServers != null) { + setBootstrapServers(props); + } catch (IOException e) { + log.error("Error reading configuration file: ", e); + throw e; + } + return props; + } + + private static void setBootstrapServers(Properties props) { + if (kafkaServers != null && !kafkaServers.isEmpty()) { + props.setProperty("bootstrap.servers", kafkaServers); + log.info("Using actively bootstrap servers: {}", kafkaServers); + } else { + String kafkaServersEnv = System.getenv("KAFKA_SERVERS"); + if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) { + kafkaServers = kafkaServersEnv; props.setProperty("bootstrap.servers", kafkaServers); - log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers); + log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers); } else { - log.info("Env variable KAFKA_SERVERS not found, defaulting to config file"); + log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file"); } - } catch (IOException e) { - log.error("Error reading configuration file: ", e.getMessage()); } - return props; + } + + public static void setKafkaServers(String servers) { + kafkaServers = servers; } } diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java index f9cffd8e..e929e441 100644 --- a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import com.demo.producer.messages.PropertiesHelper; import com.demo.producer.repository.Job.Parameters; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -55,9 +56,20 @@ public class Jobs { public void addJob(String id, InfoType type, String owner, Parameters parameters) { Job job = new Job(id, type, owner, parameters); + setKafkaServersEnvironment(job); this.put(job); } + private void setKafkaServersEnvironment(Job job) { + String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers(); + if (kafkaServers != null && !kafkaServers.isEmpty()) { + PropertiesHelper.setKafkaServers(kafkaServers); + logger.info("Setting variable bootStrapServers: {}", kafkaServers); + } else { + logger.warn("bootStrapServers is not set for job: {}", job.getId()); + } + } + private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml index 31966dff..28ae26c1 100644 --- a/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml +++ b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml @@ -16,7 +16,7 @@ # vars: time: 1000 - autostart: true + autostart: false topic: mytopic #This topic is used only in autostart spring: diff --git a/sample-services/ics-producer-consumer/start.sh b/sample-services/ics-producer-consumer/start.sh index 94da48e9..e05883e9 100755 --- a/sample-services/ics-producer-consumer/start.sh +++ b/sample-services/ics-producer-consumer/start.sh @@ -71,17 +71,6 @@ wait_for_container "kafka-consumer" "Started Application" echo "Kafka container is up and running. Starting producer and consumer..." space -#Using the autostart flag in the application.yaml -echo "Start 1 Producer on mytopic" -#curl -X GET http://localhost:8080/startProducer/mytopic -space - -echo "Start 1 Consumer on mytopic" -#curl -X GET http://localhost:8081/startConsumer/mytopic -space - -sleep 10 - echo "Sending type1 to ICS" curl -X 'PUT' \ 'http://localhost:8083/data-producer/v1/info-types/type1' \ @@ -92,8 +81,10 @@ curl -X 'PUT' \ "$schema":"http://json-schema.org/draft-07/schema#", "title":"STD_Type1_1.0.0", "description":"Type 1", - "type":"object" - } + "topic": "mytopic", + "bootStrapServers": "http://kafka-zkless:9092", + "numberOfMessages": 0 + } }' echo "Getting types from ICS" @@ -146,14 +137,43 @@ curl -X 'PUT' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ - "status_result_uri": "http://kafka-consumer:8081/info-type-status", - "owner": "owner" + "status_result_uri": "http://kafka-consumer:8081/consumer/info-type-status", + "owner": "demo" }' echo "Getting Consumer Subscription Job infos from ICS" curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json' space -sleep 5 +#TEST To set kafka broker in the consumer +curl -X 'POST' \ + 'http://localhost:8081/consumer/job/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_type_id": "type1", + "job_owner": "demo", + "job_definition": { + "deliveryInfo": { + "topic": "mytopic", + "bootStrapServers": "http://kafka-zkless:9092", + "numberOfMessages": 0 + } + }, + "job_result_uri": "http://kafka-producer:8080/producer/job", + "status_notification_uri": "http://kafka-producer:8080/producer/supervision" +}' + +#Using the autostart flag in the application.yaml +echo "Start 1 Producer on mytopic" +curl -X GET http://localhost:8080/startProducer/mytopic +space + +echo "Start 1 Consumer on mytopic" +#curl -X GET http://localhost:8081/startConsumer/mytopic +space + +sleep 10 + echo "ICS Producer Docker logs " docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions' space -- 2.16.6