X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sample-services%2Fics-producer-consumer%2Fproducer%2Fsrc%2Fmain%2Fjava%2Fcom%2Fdemo%2Fproducer%2Fproducer%2FSimpleProducer.java;fp=sample-services%2Fics-producer-consumer%2Fproducer%2Fsrc%2Fmain%2Fjava%2Fcom%2Fdemo%2Fproducer%2Fproducer%2FSimpleProducer.java;h=6c9858b14603ba0e38526946c51e251a61c57970;hb=6360bbb90944220eef2f0b8f03623ae40c9646cd;hp=0000000000000000000000000000000000000000;hpb=0f1c16fd071c70215eed25fa45ecce4803c83d72;p=nonrtric.git diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java new file mode 100644 index 00000000..6c9858b1 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java @@ -0,0 +1,112 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.producer.producer; + +import java.util.UUID; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.json.simple.JSONObject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import com.demo.producer.messages.AbstractSimpleKafka; +import com.demo.producer.messages.KafkaMessageHandler; +import com.demo.producer.messages.MessageHelper; +import com.demo.producer.messages.PropertiesHelper; + +import lombok.Getter; +import lombok.Setter; + +@Component +@Getter +@Setter +public class SimpleProducer extends AbstractSimpleKafka { + private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class); + + @Value("${vars.time:1000}") + private int TIME; + + private KafkaProducer kafkaProducer; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception { + for (int i = 0; i < numberOfMessages; i++) { + String key = UUID.randomUUID().toString(); + String message = MessageHelper.getRandomString(); + if (this.getTopicName() == null) { + this.setTopicName(topicName); + } + this.send(topicName, key, message); + Thread.sleep(TIME); + } + this.shutdown(); + } + + public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception { + while (true) { + String key = UUID.randomUUID().toString(); + String message = MessageHelper.getRandomString(); + this.send(topicName, key, message); + Thread.sleep(TIME); + } + } + + private String topicName = null; + + private void setTopicName(String topicName) { + this.topicName = topicName; + } + + private String getTopicName() { + return this.topicName; + } + + protected void send(String topicName, String key, String message) throws Exception { + String source = SimpleProducer.class.getName(); + ProducerRecord producerRecord = new ProducerRecord<>(topicName, key, message); + JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, key, message); + log.info(obj.toJSONString()); + getKafkaProducer().send(producerRecord); + } + + private KafkaProducer getKafkaProducer() throws Exception { + if (this.kafkaProducer == null) { + Properties props = PropertiesHelper.getProperties(); + this.kafkaProducer = new KafkaProducer<>(props); + } + return this.kafkaProducer; + } + + public void shutdown(){ + closed.set(true); + try { + log.info(MessageHelper.getSimpleJSONObject("Shutting down producer").toJSONString()); + getKafkaProducer().close(); + } catch (Exception e) { + log.error("Error shutting down the Producer ", e.getMessage()); + } + + } +}