From 6360bbb90944220eef2f0b8f03623ae40c9646cd Mon Sep 17 00:00:00 2001 From: lapentafd Date: Fri, 5 Apr 2024 09:49:44 +0100 Subject: [PATCH] ICS sample producer and consumer Sample Java producer and consumer that integrates with ICS callbacks. Issue-ID: NONRTRIC-965 Change-Id: I7319b46802444af130a3bd0d5c6bdd12f97c9904 Signed-off-by: lapentafd --- sample-services/ics-producer-consumer/.gitignore | 26 +++ sample-services/ics-producer-consumer/README.md | 102 +++++++++++ .../ics-producer-consumer/application.yaml | 83 +++++++++ .../ics-producer-consumer/consumer/Dockerfile | 49 ++++++ .../ics-producer-consumer/consumer/Makefile | 51 ++++++ .../consumer/container-tag.yaml | 23 +++ .../ics-producer-consumer/consumer/pom.xml | 92 ++++++++++ .../main/java/com/demo/consumer/Application.java | 57 +++++++ .../com/demo/consumer/consumer/SimpleConsumer.java | 144 ++++++++++++++++ .../consumer/controllers/ConsumerController.java | 85 +++++++++ .../consumer/controllers/ThreadsController.java | 93 ++++++++++ .../com/demo/consumer/dme/ConsumerJobInfo.java | 52 ++++++ .../com/demo/consumer/dme/ConsumerStatusInfo.java | 47 +++++ .../consumer/messages/AbstractSimpleKafka.java | 42 +++++ .../messages/ApplicationMessageHandlerImpl.java | 39 +++++ .../consumer/messages/KafkaMessageHandler.java | 28 +++ .../consumer/messages/KafkaMessageHandlerImpl.java | 37 ++++ .../com/demo/consumer/messages/MessageHelper.java | 73 ++++++++ .../demo/consumer/messages/PropertiesHelper.java | 58 +++++++ .../com/demo/consumer/repository/InfoType.java | 49 ++++++ .../com/demo/consumer/repository/InfoTypes.java | 81 +++++++++ .../java/com/demo/consumer/repository/Job.java | 71 ++++++++ .../java/com/demo/consumer/repository/Jobs.java | 83 +++++++++ .../consumer/src/main/resources/application.yaml | 31 ++++ .../consumer/src/main/resources/config.properties | 42 +++++ .../consumer/src/main/resources/logback.xml | 45 +++++ .../java/com/demo/consumer/SimpleConsumerTest.java | 86 ++++++++++ .../ics-producer-consumer/docker-compose.yaml | 83 +++++++++ .../ics-producer-consumer/docker-compose/.env | 24 +++ .../docker-compose/LICENSE.txt | 14 ++ .../ics-producer-consumer/docker-compose/README.md | 35 ++++ .../docker-compose/control-panel/config/nginx.conf | 45 +++++ .../control-panel/docker-compose.yaml | 32 ++++ .../docker-compose/docker-compose.yaml | 20 +++ .../config/application-nonrtricgateway.yaml | 48 ++++++ .../nonrtric-gateway/docker-compose.yaml | 33 ++++ .../docker-composeRedPanda.yaml | 38 +++++ .../ics-producer-consumer/producer/Dockerfile | 49 ++++++ .../ics-producer-consumer/producer/Makefile | 51 ++++++ .../producer/container-tag.yaml | 23 +++ .../ics-producer-consumer/producer/pom.xml | 92 ++++++++++ .../main/java/com/demo/producer/Application.java | 56 ++++++ .../producer/controllers/ProducerController.java | 137 +++++++++++++++ .../producer/controllers/ThreadsController.java | 93 ++++++++++ .../demo/producer/dme/ProducerInfoTypeInfo.java | 41 +++++ .../com/demo/producer/dme/ProducerJobInfo.java | 53 ++++++ .../producer/dme/ProducerRegistrationInfo.java | 47 +++++ .../producer/messages/AbstractSimpleKafka.java | 43 +++++ .../messages/ApplicationMessageHandlerImpl.java | 39 +++++ .../producer/messages/KafkaMessageHandler.java | 28 +++ .../producer/messages/KafkaMessageHandlerImpl.java | 37 ++++ .../com/demo/producer/messages/MessageHelper.java | 71 ++++++++ .../demo/producer/messages/PropertiesHelper.java | 58 +++++++ .../com/demo/producer/producer/SimpleProducer.java | 112 ++++++++++++ .../com/demo/producer/repository/InfoType.java | 49 ++++++ .../com/demo/producer/repository/InfoTypes.java | 81 +++++++++ .../java/com/demo/producer/repository/Job.java | 71 ++++++++ .../java/com/demo/producer/repository/Jobs.java | 83 +++++++++ .../producer/src/main/resources/application.yaml | 28 +++ .../producer/src/main/resources/config.properties | 44 +++++ .../producer/src/main/resources/logback.xml | 44 +++++ .../java/com/demo/producer/SimpleProducerTest.java | 105 ++++++++++++ .../com/demo/producer/ThreadsControllerTest.java | 89 ++++++++++ sample-services/ics-producer-consumer/red.sh | 189 +++++++++++++++++++++ .../ics-producer-consumer/runconsumer.sh | 2 + .../ics-producer-consumer/runproducer.sh | 2 + sample-services/ics-producer-consumer/start.sh | 180 ++++++++++++++++++++ sample-services/ics-producer-consumer/stop.sh | 27 +++ sample-services/ics-producer-consumer/utils.sh | 98 +++++++++++ 69 files changed, 4163 insertions(+) create mode 100644 sample-services/ics-producer-consumer/.gitignore create mode 100644 sample-services/ics-producer-consumer/README.md create mode 100644 sample-services/ics-producer-consumer/application.yaml create mode 100644 sample-services/ics-producer-consumer/consumer/Dockerfile create mode 100644 sample-services/ics-producer-consumer/consumer/Makefile create mode 100644 sample-services/ics-producer-consumer/consumer/container-tag.yaml create mode 100644 sample-services/ics-producer-consumer/consumer/pom.xml create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties create mode 100644 sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml create mode 100644 sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java create mode 100644 sample-services/ics-producer-consumer/docker-compose.yaml create mode 100644 sample-services/ics-producer-consumer/docker-compose/.env create mode 100644 sample-services/ics-producer-consumer/docker-compose/LICENSE.txt create mode 100644 sample-services/ics-producer-consumer/docker-compose/README.md create mode 100644 sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf create mode 100644 sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml create mode 100644 sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml create mode 100644 sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml create mode 100644 sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml create mode 100644 sample-services/ics-producer-consumer/docker-composeRedPanda.yaml create mode 100644 sample-services/ics-producer-consumer/producer/Dockerfile create mode 100644 sample-services/ics-producer-consumer/producer/Makefile create mode 100644 sample-services/ics-producer-consumer/producer/container-tag.yaml create mode 100644 sample-services/ics-producer-consumer/producer/pom.xml create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java create mode 100644 sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml create mode 100644 sample-services/ics-producer-consumer/producer/src/main/resources/config.properties create mode 100644 sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml create mode 100644 sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java create mode 100644 sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java create mode 100644 sample-services/ics-producer-consumer/red.sh create mode 100644 sample-services/ics-producer-consumer/runconsumer.sh create mode 100644 sample-services/ics-producer-consumer/runproducer.sh create mode 100755 sample-services/ics-producer-consumer/start.sh create mode 100644 sample-services/ics-producer-consumer/stop.sh create mode 100644 sample-services/ics-producer-consumer/utils.sh diff --git a/sample-services/ics-producer-consumer/.gitignore b/sample-services/ics-producer-consumer/.gitignore new file mode 100644 index 00000000..851f236b --- /dev/null +++ b/sample-services/ics-producer-consumer/.gitignore @@ -0,0 +1,26 @@ +.idea +.DS_Store +target +logging.log + +*.class +logs/ +.vscode +*.log + +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* diff --git a/sample-services/ics-producer-consumer/README.md b/sample-services/ics-producer-consumer/README.md new file mode 100644 index 00000000..1731c172 --- /dev/null +++ b/sample-services/ics-producer-consumer/README.md @@ -0,0 +1,102 @@ + +# Automatic +### Using Kafka with a Java Producer and Consumer + +Run the demo script +It will check prerequisites, build a consumer and producer, and run them with kafka and ICS. + +```shell +./start.sh +``` +Or run the other script to bring up RedPanda on port 8888 and NONRTRIC control panel UI on port 8181 + +```shell +./red.sh +``` + +For a faster execution you can add: + +--skip-build to skip creating the app jar and building the docker images + +--no-console to skip running RedPanda and NONRTRIC control panel + +```shell +./red.sh --skip-build --no-console +``` +# Manual +### Run Kafka in a container + +```shell +docker-compose up -d kafka-zkless +``` + +### Starting the REST application individually + +In a new terminal window: + +```shell +mvn spring-boot:run +``` + +### Starting a producer + +```shell +sh ./runproducer.sh +``` + +### Starting a Consumer + +```shell +sh ./runproducer.sh +``` + +## Reading the logs + +A sample of the output is as follows: + +``` +Demo Producer Docker logs + +2024-04-02 12:48:05 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"ygHwxXSIxW","key":"f8f1a7a7-a78e-4c7d-9b8d-108bb0cc9e2c"} +2024-04-02 12:48:06 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"KNIbP10zfN","key":"b058d00f-bbcd-4d2c-936b-6327847d4c2a"} +2024-04-02 12:48:07 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"V6fH1NkdeH","key":"ae1a83a3-d8a7-40c8-9d98-529230f8b585"} +2024-04-02 12:48:08 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"m76qvRFh6f","key":"abccde52-fa72-4fd4-99ab-5bc21514d825"} +2024-04-02 12:48:09 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"t7FJYnFr43","key":"0602239e-34e9-45a6-a04a-3c67b4c7d9e4"} + +++++++++++++++++++++++++++++++++++++++++++++++++++++ + +Demo Consumer Docker logs + +2024-04-02 12:48:05 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: ygHwxXSIxW"} +2024-04-02 12:48:06 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: KNIbP10zfN"} +2024-04-02 12:48:07 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: V6fH1NkdeH"} +2024-04-02 12:48:08 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: m76qvRFh6f"} +2024-04-02 12:48:09 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: t7FJYnFr43"} + +++++++++++++++++++++++++++++++++++++++++++++++++++++ + +ICS logs + +2024-04-02T12:48:05.615Z DEBUG 1 --- [or-http-epoll-2] o.o.i.c.r1producer.ProducerCallbacks : Job subscription 1 started OK 1 +2024-04-02T12:48:05.820Z DEBUG 1 --- [io-8083-exec-10] o.o.i.repository.InfoTypeSubscriptions : Added type status subscription 1 +``` + +The script will fail (exit 1) if there are anny ERRORS logged in the kafka-producer and kafka-consumer. \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/application.yaml b/sample-services/ics-producer-consumer/application.yaml new file mode 100644 index 00000000..cfb4f78d --- /dev/null +++ b/sample-services/ics-producer-consumer/application.yaml @@ -0,0 +1,83 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +spring: + profiles: + active: prod + main: + allow-bean-definition-overriding: true + aop: + auto: false +springdoc: + show-actuator: true +management: + endpoints: + web: + exposure: + # Enabling of springboot actuator features. See springboot documentation. + include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown" + endpoint: + shutdown: + enabled: true +lifecycle: + timeout-per-shutdown-phase: "20s" +logging: + # Configuration of logging + level: + ROOT: INFO + org.springframework: INFO + org.springframework.data: INFO + org.springframework.web.reactive.function.client.ExchangeFunctions: INFO + org.oransc.ics: DEBUG + file: + name: /var/log/information-coordinator-service/application.log +server: + # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework. + # See springboot documentation. + port : 8434 + http-port: 8083 + ssl: + key-store-type: JKS + key-store-password: policy_agent + key-store: /opt/app/information-coordinator-service/etc/cert/keystore.jks + key-password: policy_agent + key-alias: policy_agent + shutdown: "graceful" +app: + webclient: + # Configuration of the trust store used for the HTTP client (outgoing requests) + # The file location and the password for the truststore is only relevant if trust-store-used == true + # Note that the same keystore as for the server is used. + trust-store-used: false + trust-store-password: policy_agent + trust-store: /opt/app/information-coordinator-service/etc/cert/truststore.jks + # Configuration of usage of HTTP Proxy for the southbound accesses. + # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s + http.proxy-host: + http.proxy-port: 0 + vardata-directory: /var/information-coordinator-service + # If the file name is empty, no authorization token is used + auth-token-file: + # A URL to authorization provider such as OPA. Each time a information job is accessed, a call to this + # authorization provider is done for access control. If this is empty, no fine grained access control is done. + info-job-authorization-agent: + # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter. + s3: + endpointOverride: http://localhost:9000 + accessKeyId: minio + secretAccessKey: miniostorage + bucket: \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/consumer/Dockerfile b/sample-services/ics-producer-consumer/consumer/Dockerfile new file mode 100644 index 00000000..d5d6f2aa --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/Dockerfile @@ -0,0 +1,49 @@ +#================================================================================== +# Copyright (C) 2024: OpenInfra Foundation Europe +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This source code is part of the near-RT RIC (RAN Intelligent Controller) +# platform project (RICP). +#================================================================================== + +# Use Maven image with OpenJDK 17 for the build stage +FROM maven:3.8.5-openjdk-17 AS maven_build + +# Copy Maven project files +COPY pom.xml /tmp/ +COPY src /tmp/src/ + +# Set working directory +WORKDIR /tmp/ + +# Build the Maven project +RUN mvn package + +# Use a separate image with OpenJDK 17 for the runtime stage +FROM openjdk:17-jdk-slim + +# Expose port 8081 +EXPOSE 8081 + +ARG KAFKA_SERVERS +ENV KAFKA_SERVERS=${KAFKA_SERVERS} + +# Set the working directory +WORKDIR /app + +# Copy the JAR file from the maven_build stage to the runtime stage +COPY --from=maven_build /tmp/target/consumer-0.0.1.jar /app/consumer-0.0.1.jar + +# Command to run the application +CMD ["java", "-jar", "consumer-0.0.1.jar"] diff --git a/sample-services/ics-producer-consumer/consumer/Makefile b/sample-services/ics-producer-consumer/consumer/Makefile new file mode 100644 index 00000000..4e87cd31 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/Makefile @@ -0,0 +1,51 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# Define variables +IMAGE_NAME = "o-ran-sc/nonrtric-sample-icsconsumer" +DOCKERFILE = Dockerfile + +# Default target +.PHONY: all +all: build run + +# Target to build the Maven JAR +.PHONY: jar +jar: + mvn clean package + +# Target to build the Docker image +.PHONY: build +build: + docker build -t $(IMAGE_NAME) -f $(DOCKERFILE) . + +# Target to run the Docker container +.PHONY: run +run: + docker run -p 8081:8081 $(IMAGE_NAME) + +# Target to stop and remove the Docker container +.PHONY: stop +stop: + docker stop $(IMAGE_NAME) || true + docker rm $(IMAGE_NAME) || true + +# Target to clean up +.PHONY: clean +clean: stop + docker rmi $(IMAGE_NAME) || true diff --git a/sample-services/ics-producer-consumer/consumer/container-tag.yaml b/sample-services/ics-producer-consumer/consumer/container-tag.yaml new file mode 100644 index 00000000..48290af7 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/container-tag.yaml @@ -0,0 +1,23 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# The Jenkins job requires a tag to build the Docker image. +# By default this file is in the docker build directory, +# but the location can configured in the JJB template. +--- +tag: 0.0.1 \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/consumer/pom.xml b/sample-services/ics-producer-consumer/consumer/pom.xml new file mode 100644 index 00000000..84a6ae81 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.2.3 + + + + com.demo.consumer + consumer + 0.0.1 + jar + + + 17 + 17 + + + + + + org.apache.kafka + kafka-clients + 3.7.0 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-test + test + + + org.projectlombok + lombok + 1.18.24 + provided + + + com.google.code.gson + gson + 2.10.1 + + + org.springframework.boot + spring-boot-starter + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.demo.consumer.Application + + + + + diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java new file mode 100644 index 00000000..0ba278be --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java @@ -0,0 +1,57 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.EnableAsync; + +import com.demo.consumer.consumer.SimpleConsumer; +import com.demo.consumer.messages.KafkaMessageHandlerImpl; + +@SpringBootApplication +@EnableAsync +public class Application { + + @Autowired + private SimpleConsumer simpleConsumer; + + @Value("${vars.autostart:false}") + private boolean autostart; + + @Value("${vars.topic}") + private String topic; + + public static void main(String[] args) throws Exception { + SpringApplication.run(Application.class, args); + } + + @EventListener(ApplicationReadyEvent.class) + public void checkAutoRun() throws Exception{ + if (autostart) { + simpleConsumer.runAlways(topic, new KafkaMessageHandlerImpl()); + } + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java new file mode 100644 index 00000000..430ab597 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java @@ -0,0 +1,144 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import com.demo.consumer.messages.AbstractSimpleKafka; +import com.demo.consumer.messages.KafkaMessageHandler; +import com.demo.consumer.messages.MessageHelper; +import com.demo.consumer.messages.PropertiesHelper; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Properties; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +@Component +@Setter +@Getter +public class SimpleConsumer extends AbstractSimpleKafka { + private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class); + + private KafkaConsumer kafkaConsumer = null; + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Value("${vars.time:2000}") + private int TIME_OUT_MS; + + public void run(String topicName, int numberOfRecords, KafkaMessageHandler callback) throws Exception { + Properties props = PropertiesHelper.getProperties(); + // See if the number of records is provided + Optional recs = Optional.ofNullable(numberOfRecords); + + // adjust the number of records to get accordingly + Integer numOfRecs = recs.orElseGet(() -> Integer.parseInt(props.getProperty("max.poll.records"))); + props.setProperty("max.poll.records", String.valueOf(numOfRecs)); + + // create the consumer + KafkaConsumer consumer = new KafkaConsumer<>(props); + + // make the consumer available for graceful shutdown + setKafkaConsumer(consumer); + consumer.assign(Collections.singleton(new TopicPartition(topicName, 0))); + //consumer.seekToBeginning(consumer.assignment()); //--from-beginning + int recNum = numOfRecs; + while (recNum > 0) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(TIME_OUT_MS)); + recNum = records.count(); + if (recNum == 0) { + log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString()); + break; + } + + for (ConsumerRecord record : records) { + callback.processMessage(topicName, record); + recNum--; + } + } + consumer.close(); + } + + private void close() throws Exception { + if (this.getKafkaConsumer() == null) { + log.info(MessageHelper.getSimpleJSONObject("The internal consumer is NULL").toJSONString()); + return; + } + log.info(MessageHelper.getSimpleJSONObject("Closing consumer").toJSONString()); + this.getKafkaConsumer().close(); + } + + public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception { + Properties props = PropertiesHelper.getProperties(); + // make the consumer available for graceful shutdown + setKafkaConsumer(new KafkaConsumer<>(props)); + + // keep running forever or until shutdown() is called from another thread. + try { + getKafkaConsumer().subscribe(List.of(topicName)); + while (!closed.get()) { + ConsumerRecords records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS)); + if (records.count() == 0) { + log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString()); + } + + for (ConsumerRecord record : records) { + callback.processMessage(topicName, record); + log.info(MessageHelper.getSimpleJSONObject("Topic: " + topicName + "Message: " + record.value()).toJSONString()); + } + } + } catch (WakeupException e) { + // Ignore exception if closing + if (!closed.get()) + throw e; + } + } + + public void shutdown() { + closed.set(true); + try { + log.info(MessageHelper.getSimpleJSONObject("Shutting down consumer").toJSONString()); + } catch (Exception e) { + } + getKafkaConsumer().wakeup(); + } + + public KafkaConsumer getKafkaConsumer() { + return kafkaConsumer; + } + + public void setKafkaConsumer(KafkaConsumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java new file mode 100644 index 00000000..63cc2157 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java @@ -0,0 +1,85 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.controllers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.demo.consumer.repository.InfoType; +import com.demo.consumer.repository.InfoTypes; +import com.demo.consumer.repository.Job.Parameters; +import com.demo.consumer.dme.ConsumerJobInfo; +import com.demo.consumer.dme.ConsumerStatusInfo; +import com.demo.consumer.repository.Jobs; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +@RestController +@RequestMapping(path = "/consumer", produces = "application/json") +public class ConsumerController { + private static final Logger log = LoggerFactory.getLogger(ConsumerController.class); + + private static Gson gson = new GsonBuilder().create(); + + private final Jobs jobs; + private final InfoTypes types; + + public ConsumerController(@Autowired Jobs jobs, @Autowired InfoTypes types) { + this.jobs = jobs; + this.types = types; + InfoType type1 = InfoType.builder().build(); + Parameters p = Parameters.builder().build(); + type1.setId("type1"); + type1.setKafkaInputTopic("mytopic"); + type1.setInputJobType("type1"); + type1.setInputJobDefinition(p); + types.put(type1); + } + + @PostMapping("/job/{infoJobId}") + public void startinfojob(@RequestBody String requestBody, @PathVariable String infoJobId) { + ConsumerJobInfo request = gson.fromJson(requestBody, ConsumerJobInfo.class); + log.info("Add Job Info" + infoJobId, request); + try { + this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner, + toJobParameters(request.jobDefinition)); + } catch (Exception e) { + log.error("Error adding the job" + infoJobId, e.getMessage()); + } + } + + @PostMapping("/info-type-status") + public void statusChange(@RequestBody String requestBody) { + ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class); + log.info("Add Status Job Info", request); + } + + private Parameters toJobParameters(Object jobData) { + String json = gson.toJson(jobData); + return gson.fromJson(json, Parameters.class); + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java new file mode 100644 index 00000000..f668e3ad --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java @@ -0,0 +1,93 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.controllers; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import com.demo.consumer.consumer.SimpleConsumer; +import com.demo.consumer.messages.ApplicationMessageHandlerImpl; + +@RestController +public class ThreadsController { + private static final Logger log = LoggerFactory.getLogger(ThreadsController.class); + + @Autowired + private SimpleConsumer simpleConsumer; + + private Thread consumerThread; + + @Async + @GetMapping("/startConsumer/{topicName}") + public CompletableFuture startConsumer(@PathVariable("topicName") String topicName) { + try { + Thread consumerThread = new Thread(() -> { + try { + simpleConsumer.runAlways(topicName, new ApplicationMessageHandlerImpl()); + } catch (Exception e) { + log.error("Error starting consuming on: " + topicName, e.getMessage()); + } + }); + consumerThread.start(); + return CompletableFuture.completedFuture("Consumer started for topic: " + topicName); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + @GetMapping("/stopConsumer") + public String stopConsumer() { + if (consumerThread != null && consumerThread.isAlive()) { + try { + simpleConsumer.shutdown(); + } catch (Exception e) { + log.error("Error stopping consumer Thread", e.getMessage()); + } + return "Consumer stopped successfully"; + } else { + return "No active consumer to stop"; + } + } + + @GetMapping("/listen/{numberOfMessages}/on/{topicName}") + public CompletableFuture listenMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) { + try { + Thread consumerThread = new Thread(() -> { + try { + simpleConsumer.run(topicName, numberOfMessages, new ApplicationMessageHandlerImpl()); + } catch (Exception e) { + log.error("Error starting consuming on: " + topicName, e.getMessage()); + } + }); + consumerThread.start(); + return CompletableFuture.completedFuture("Consumer started for topic: " + topicName); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java new file mode 100644 index 00000000..b3cd4167 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java @@ -0,0 +1,52 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.dme; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class ConsumerJobInfo { + + @SerializedName("info_type_id") + @JsonProperty(value = "info_type_id", required = true) + public String infoTypeId = ""; + + @SerializedName("job_owner") + @JsonProperty(value = "job_owner", required = true) + public String owner = ""; + + @SerializedName("job_definition") + @JsonProperty(value = "job_definition", required = true) + public Object jobDefinition; + + @SerializedName("job_result_uri") + @JsonProperty(value = "job_result_uri", required = true) + public String jobResultUri = ""; + + @SerializedName("status_notification_uri") + @JsonProperty(value = "status_notification_uri", required = false) + public String statusNotificationUri = ""; + +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java new file mode 100644 index 00000000..46f1a3fd --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java @@ -0,0 +1,47 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.dme; + +import java.util.Collection; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class ConsumerStatusInfo { + + public enum InfoJobStatusValues { + REGISTERED, UNREGISTERED + } + + @SerializedName("info_job_status") + @JsonProperty(value = "info_job_status", required = true) + public InfoJobStatusValues state; + + @SerializedName("producers") + @JsonProperty(value = "producers", required = true) + public Collection producers; + +} + diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java new file mode 100644 index 00000000..4f817a98 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java @@ -0,0 +1,42 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.messages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractSimpleKafka { + private static final Logger log = LoggerFactory.getLogger(AbstractSimpleKafka.class); + + public AbstractSimpleKafka(){ + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdown(); + } + }); + log.info(MessageHelper.getSimpleJSONObject("Created the Shutdown Hook").toJSONString()); + } + + public abstract void shutdown(); + public abstract void runAlways(String topicName, KafkaMessageHandler callback) throws Exception; + public abstract void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception; +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java new file mode 100644 index 00000000..ed4a9306 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java @@ -0,0 +1,39 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.messages; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ApplicationMessageHandlerImpl implements KafkaMessageHandler { + + private static final Logger log = LoggerFactory.getLogger(ApplicationMessageHandlerImpl.class); + + @Override + public void processMessage(String topicName, ConsumerRecord message) throws Exception { + String source = KafkaMessageHandlerImpl.class.getName(); + JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value()); + System.out.println(obj.toJSONString()); + log.info(obj.toJSONString()); + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java new file mode 100644 index 00000000..9c16f9e7 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java @@ -0,0 +1,28 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.messages; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +@FunctionalInterface +public interface KafkaMessageHandler { + void processMessage(String topicName, ConsumerRecord message) throws Exception; +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java new file mode 100644 index 00000000..8ff36feb --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java @@ -0,0 +1,37 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.messages; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.json.simple.JSONObject; + +public class KafkaMessageHandlerImpl implements KafkaMessageHandler { + private static final Logger log = LoggerFactory.getLogger(KafkaMessageHandlerImpl.class); + + @Override + public void processMessage(String topicName, ConsumerRecord message) throws Exception { + String source = KafkaMessageHandlerImpl.class.getName(); + JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value()); + log.info(obj.toJSONString()); + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java new file mode 100644 index 00000000..21063cc4 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java @@ -0,0 +1,73 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.messages; + +import java.util.Properties; +import java.util.Random; +import org.json.simple.JSONObject; + +/** + * The type Message helper. + */ +public class MessageHelper { + + private static Properties props; + + public static String getRandomString() { + int leftLimit = 48; // numeral '0' + int rightLimit = 122; // letter 'z' + int targetStringLength = 10; + Random random = new Random(); + + return random.ints(leftLimit, rightLimit + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97)) + .limit(targetStringLength) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString(); + } + + @SuppressWarnings("unchecked") // Using only strings + public static JSONObject getMessageLogEntryJSON(String source, String topic, String key, String message) + throws Exception { + JSONObject obj = new JSONObject(); + String bootstrapServers = getProperties().getProperty("bootstrap.servers"); + obj.put("bootstrapServers", bootstrapServers); + obj.put("source", source); + obj.put("topic", topic); + obj.put("key", key); + obj.put("message", message); + + return obj; + } + + @SuppressWarnings("unchecked") + public static JSONObject getSimpleJSONObject(String message) { + JSONObject obj = new JSONObject(); + obj.put("message", message); + return obj; + } + + protected static Properties getProperties() throws Exception { + if (props == null) { + props = PropertiesHelper.getProperties(); + } + return props; + } + +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java new file mode 100644 index 00000000..18be2f8e --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java @@ -0,0 +1,58 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.messages; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.demo.consumer.consumer.SimpleConsumer; + +@Component +public class PropertiesHelper { + private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class); + + public static Properties getProperties() throws Exception { + Properties props = null; + try (InputStream input = SimpleConsumer.class.getClassLoader().getResourceAsStream("config.properties")) { + props = new Properties(); + if (input == null) { + log.error("Found no configuration file in resources"); + throw new Exception("Sorry, unable to find config.properties"); + } + props.load(input); + String kafkaServers = System.getenv("KAFKA_SERVERS"); + if (kafkaServers != null) { + props.setProperty("bootstrap.servers", kafkaServers); + log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers); + } else { + log.info("Env variable KAFKA_SERVERS not found, defaulting to config file"); + } + } catch (IOException e) { + log.error("Error reading configuration file: ", e.getMessage()); + } + return props; + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java new file mode 100644 index 00000000..45b4c7cf --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java @@ -0,0 +1,49 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.repository; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@ToString +@Builder(access = AccessLevel.PUBLIC) +public class InfoType { + + @Getter + @Setter + private String id; + + @Getter + @Setter + private String kafkaInputTopic; + + @Getter + @Setter + private String inputJobType; + + @Getter + @Setter + private Object inputJobDefinition; + +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java new file mode 100644 index 00000000..f1e83be5 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java @@ -0,0 +1,81 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.repository; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +@Component +public class InfoTypes { + private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class); + + private Map allTypes = new HashMap<>(); + + public InfoTypes(Collection types) { + for (InfoType type : types) { + put(type); + } + } + + public synchronized InfoType get(String id) { + return allTypes.get(id); + } + + public synchronized InfoType getType(String id) throws Exception { + InfoType type = allTypes.get(id); + if (type == null) { + throw new Exception("Could not find type: " + id + HttpStatus.NOT_FOUND.toString()); + } + return type; + } + + public static class ConfigFile { + Collection types; + } + + public synchronized void put(InfoType type) { + logger.debug("Put type: {}", type.getId()); + allTypes.put(type.getId(), type); + } + + public synchronized Iterable getAll() { + return new Vector<>(allTypes.values()); + } + + public synchronized Collection typeIds() { + return allTypes.keySet(); + } + + public synchronized int size() { + return allTypes.size(); + } + + public synchronized void clear() { + allTypes.clear(); + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java new file mode 100644 index 00000000..ba123458 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java @@ -0,0 +1,71 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.repository; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +@ToString +public class Job { + @Builder + public static class Parameters { + + @Builder + @EqualsAndHashCode + public static class KafkaDeliveryInfo { + @Getter + private String topic; + + @Getter + private String bootStrapServers; + + @JsonProperty(value = "numberOfMessages") + @Getter + private int numberOfMessages; + } + + @Getter + private KafkaDeliveryInfo deliveryInfo; + } + + @Getter + private final String id; + + @Getter + private final InfoType type; + + @Getter + private final String owner; + + @Getter + private final Parameters parameters; + + public Job(String id, InfoType type, String owner, Parameters parameters) { + this.id = id; + this.type = type; + this.owner = owner; + this.parameters = parameters; + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java new file mode 100644 index 00000000..13f31806 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java @@ -0,0 +1,83 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer.repository; + +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.demo.consumer.repository.Job.Parameters; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +@Component +public class Jobs { + private static final Logger logger = LoggerFactory.getLogger(Jobs.class); + + private Map allJobs = new HashMap<>(); + + public Jobs() { + } + + public synchronized Job getJob(String id) throws Exception { + Job job = allJobs.get(id); + if (job == null) { + throw new Exception("Could not find job: " + id); + } + return job; + } + + public synchronized Job get(String id) { + return allJobs.get(id); + } + + public void addJob(String id, InfoType type, String owner, Parameters parameters) { + Job job = new Job(id, type, owner, parameters); + this.put(job); + } + + private synchronized void put(Job job) { + logger.debug("Put job: {}", job.getId()); + allJobs.put(job.getId(), job); + } + + public synchronized Iterable getAll() { + return new Vector<>(allJobs.values()); + } + + public synchronized int size() { + return allJobs.size(); + } + + public synchronized Job delete(String id) { + return allJobs.remove(id); + } + + @Override + public String toString() { + Gson gson = new GsonBuilder().create(); + return gson.toJson(allJobs); + } +} diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml new file mode 100644 index 00000000..eed83264 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml @@ -0,0 +1,31 @@ +# ========================LICENSE_START================================= +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +server: + port : 8081 + +vars: + time: 1000 + autostart: true + topic: mytopic #This topic is used only in autostart + +spring: + application: + name: demoConsumer + +logging: + level: + root: INFO diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties b/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties new file mode 100644 index 00000000..48763d6e --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties @@ -0,0 +1,42 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# The location of the Kafka server +bootstrap.servers=${KAFKA_SERVERS:localhost:9092} + +# the default group ID +group.id=test-group + +# the default topic to use if one is not provided +default.topic=magic-topic + +# The number of records to pull of the stream every time +# the client takes a trip out to Kafka +max.poll.records=10 + +# Make Kafka keep track of record reads by the consumer +enable.auto.commit=true + +# The time in milliseconds to Kafka write the offset of the last message read +auto.commit.interval.ms=500 + +# classes for serializing and deserializing messages +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +enable.idempotence=false diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml b/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..dd90b775 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml @@ -0,0 +1,45 @@ + + + + + logging.log + + logging.%i.log.zip + 1 + 10 + + + 10MB + + + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + + + + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + + + + + + diff --git a/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java b/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java new file mode 100644 index 00000000..7a7c2799 --- /dev/null +++ b/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java @@ -0,0 +1,86 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.consumer; + +import com.demo.consumer.consumer.SimpleConsumer; +import com.demo.consumer.messages.KafkaMessageHandler; +import com.demo.consumer.messages.PropertiesHelper; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.time.Duration; + +import java.util.Properties; + +import static org.mockito.Mockito.*; + +class SimpleConsumerTest { + + private static final long TIME_OUT_MS = 1000; + + @Mock + private KafkaConsumer kafkaConsumer; + + @InjectMocks + private SimpleConsumer simpleConsumer; + + private AutoCloseable closable; + + @BeforeEach + void setUp() throws Exception { + closable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + void tearDown() throws Exception { + closable.close(); + } + + @Test + void testRun() throws Exception { + // Mocking the properties object returned by PropertiesHelper.getProperties() + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "localhost:9092"); + properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + // Mocking PropertiesHelper.getProperties() to return the mocked Properties object + try (MockedStatic propertiesHelperMockedStatic = Mockito.mockStatic(PropertiesHelper.class)) { + propertiesHelperMockedStatic.when(PropertiesHelper::getProperties).thenReturn(properties); + + String topicName = "testTopic"; + int numberOfMessages = 10; + KafkaMessageHandler callback = mock(KafkaMessageHandler.class); + + simpleConsumer.run(topicName, numberOfMessages, callback); + verify(kafkaConsumer, times(0)).poll(Duration.ofMillis(TIME_OUT_MS)); + } + } + +} diff --git a/sample-services/ics-producer-consumer/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose.yaml new file mode 100644 index 00000000..d9239bc1 --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose.yaml @@ -0,0 +1,83 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +version: '2' + +networks: + my-network: + name: kafka + driver: bridge + +services: + kafka-zkless: + container_name: kafka-zkless + image: quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 + command: + [ + "sh", + "-c", + "export CLUSTER_ID=$$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $$CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS}", + ] + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092 + tty: true + stdin_open: true + networks: + - my-network + + informationcoordinator: + image: nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0 + container_name: informationcoordinatorservice + ports: + - "8083:8083" + volumes: + - ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml + networks: + - my-network + + kafka-producer: + image: o-ran-sc/nonrtric-sample-icsproducer:latest + container_name: kafka-producer + environment: + - KAFKA_SERVERS=kafka-zkless:9092 + ports: + - "8080:8080" + networks: + - my-network + + kafka-consumer: + image: o-ran-sc/nonrtric-sample-icsconsumer:latest + container_name: kafka-consumer + environment: + - KAFKA_SERVERS=kafka-zkless:9092 + ports: + - "8081:8081" + networks: + - my-network + + curl-client: + image: curlimages/curl:latest + container_name: curl-client + command: ["tail", "-f", "/dev/null"] + networks: + - my-network diff --git a/sample-services/ics-producer-consumer/docker-compose/.env b/sample-services/ics-producer-consumer/docker-compose/.env new file mode 100644 index 00000000..257ac73b --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/.env @@ -0,0 +1,24 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2021 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + +#CONTROL_PANEL +CONTROL_PANEL_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-controlpanel" +CONTROL_PANEL_IMAGE_TAG="2.5.0" + +#NONRTRIC_GATEWAY +NONRTRIC_GATEWAY_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-gateway" +NONRTRIC_GATEWAY_IMAGE_TAG="1.2.0" \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt b/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt new file mode 100644 index 00000000..24966636 --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt @@ -0,0 +1,14 @@ +LICENSE.txt + +Unless otherwise specified, all software contained herein is licensed +under the Apache License, Version 2.0 (the "Software License"); +you may not use this software except in compliance with the Software +License. You may obtain a copy of the Software License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the Software License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the Software License for the specific language governing permissions +and limitations under the Software License. diff --git a/sample-services/ics-producer-consumer/docker-compose/README.md b/sample-services/ics-producer-consumer/docker-compose/README.md new file mode 100644 index 00000000..d7542a8c --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/README.md @@ -0,0 +1,35 @@ +# License + +Copyright (C) 2020 Nordix Foundation. +Licensed under the Apache License, Version 2.0 (the "License") +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +For more information about license please see the [LICENSE](LICENSE.txt) file for details. + +## O-RAN-SC docker-compose files + +The docker compose file helps the user to deploy the components of nonrtric control panel with one command. + +NOTE: +docker image urls & tags are in file ```.env``` + +To install the Control Panel and gateway, run the following command: + +```shell +docker-compose --env-file .env -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml up -d +``` + +To remove the containers, use the command: + +```shell +docker-compose --env-file .env -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml down +``` diff --git a/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf b/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf new file mode 100644 index 00000000..66e8ac4b --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf @@ -0,0 +1,45 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# + + +events{} + +http { + include /etc/nginx/mime.types; + resolver 127.0.0.11; + server { + listen 8080; + server_name localhost; + root /usr/share/nginx/html; + index index.html; + location /a1-policy/ { + set $upstream nonrtric-gateway; + proxy_pass http://$upstream:9090; + } + location /data-producer/{ + set $upstream nonrtric-gateway; + proxy_pass http://$upstream:9090; + } + location /data-consumer/{ + set $upstream nonrtric-gateway; + proxy_pass http://$upstream:9090; + } + location / { + try_files $uri $uri/ /index.html; + } + } +} diff --git a/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml new file mode 100644 index 00000000..a42b7a84 --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml @@ -0,0 +1,32 @@ +# Copyright (C) 2020 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# +version: '3.5' + +networks: + kafka: + external: true + +services: + policy-control-panel: + image: "${CONTROL_PANEL_IMAGE_BASE}:${CONTROL_PANEL_IMAGE_TAG}" + container_name: policy-control-panel + networks: + - kafka + ports: + - 8181:8080 + - 8282:8082 + volumes: + - ./control-panel/config/nginx.conf:/etc/nginx/nginx.conf:ro diff --git a/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml new file mode 100644 index 00000000..c30aa2cc --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml @@ -0,0 +1,20 @@ +# Copyright (C) 2021 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# +version: '3.5' + +networks: + kafka: + external: true diff --git a/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml new file mode 100644 index 00000000..c620b031 --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml @@ -0,0 +1,48 @@ +################################################################################ +# Copyright (c) 2021 Nordix Foundation. # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +################################################################################ + +server: + port: 9090 +spring: + cloud: + gateway: + httpclient: + ssl: + useInsecureTrustManager: true + wiretap: true + httpserver: + wiretap: true + routes: + - id: A1-EI + uri: http://informationcoordinatorservice:8083 + predicates: + - Path=/data-producer/**,/data-consumer/** +management: + endpoint: + gateway: + enabled: true + endpoints: + web: + exposure: + include: "gateway,loggers,logfile,health,info,metrics,threaddump,heapdump" +logging: + level: + ROOT: ERROR + org.springframework: ERROR + org.springframework.cloud.gateway: INFO + reactor.netty: INFO + file: + name: /var/log/nonrtric-gateway/application.log diff --git a/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml new file mode 100644 index 00000000..138229df --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml @@ -0,0 +1,33 @@ +# Copyright (C) 2021 Nordix Foundation. All rights reserved. +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# +version: '3.5' + +networks: + kafka: + external: true + +services: + nonrtric-gateway: + image: "${NONRTRIC_GATEWAY_IMAGE_BASE}:${NONRTRIC_GATEWAY_IMAGE_TAG}" + container_name: nonrtric-gateway + networks: + kafka: + aliases: + - nonrtric-gateway-container + ports: + - 9090:9090 + volumes: + - ./nonrtric-gateway/config/application-nonrtricgateway.yaml:/opt/app/nonrtric-gateway/config/application.yaml:ro diff --git a/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml b/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml new file mode 100644 index 00000000..0184de41 --- /dev/null +++ b/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml @@ -0,0 +1,38 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +version: '2' +networks: + kafka: + external: true + +services: + redpanda-console: + container_name: redpanda-console + image: docker.redpanda.com/redpandadata/console:v2.4.5 + entrypoint: /bin/sh + command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console' + environment: + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["kafka-zkless:9092"] + ports: + - 8888:8080 + networks: + - kafka diff --git a/sample-services/ics-producer-consumer/producer/Dockerfile b/sample-services/ics-producer-consumer/producer/Dockerfile new file mode 100644 index 00000000..148872a4 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/Dockerfile @@ -0,0 +1,49 @@ +#================================================================================== +# Copyright (C) 2024: OpenInfra Foundation Europe +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This source code is part of the near-RT RIC (RAN Intelligent Controller) +# platform project (RICP). +#================================================================================== + +# Use Maven image with OpenJDK 17 for the build stage +FROM maven:3.8.5-openjdk-17 AS maven_build + +# Copy Maven project files +COPY pom.xml /tmp/ +COPY src /tmp/src/ + +# Set working directory +WORKDIR /tmp/ + +# Build the Maven project +RUN mvn package + +# Use a separate image with OpenJDK 17 for the runtime stage +FROM openjdk:17-jdk-slim + +# Expose port 8080 +EXPOSE 8080 + +ARG KAFKA_SERVERS +ENV KAFKA_SERVERS=${KAFKA_SERVERS} + +# Set the working directory +WORKDIR /app + +# Copy the JAR file from the maven_build stage to the runtime stage +COPY --from=maven_build /tmp/target/producer-0.0.1.jar /app/producer-0.0.1.jar + +# Command to run the application +CMD ["java", "-jar", "producer-0.0.1.jar"] diff --git a/sample-services/ics-producer-consumer/producer/Makefile b/sample-services/ics-producer-consumer/producer/Makefile new file mode 100644 index 00000000..301526a9 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/Makefile @@ -0,0 +1,51 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# Define variables +IMAGE_NAME = "o-ran-sc/nonrtric-sample-icsproducer" +DOCKERFILE = Dockerfile + +# Default target +.PHONY: all +all: build run + +# Target to build the Maven JAR +.PHONY: jar +jar: + mvn clean package + +# Target to build the Docker image +.PHONY: build +build: + docker build -t $(IMAGE_NAME) -f $(DOCKERFILE) . + +# Target to run the Docker container +.PHONY: run +run: + docker run -p 8080:8080 $(IMAGE_NAME) + +# Target to stop and remove the Docker container +.PHONY: stop +stop: + docker stop $(IMAGE_NAME) || true + docker rm $(IMAGE_NAME) || true + +# Target to clean up +.PHONY: clean +clean: stop + docker rmi $(IMAGE_NAME) || true diff --git a/sample-services/ics-producer-consumer/producer/container-tag.yaml b/sample-services/ics-producer-consumer/producer/container-tag.yaml new file mode 100644 index 00000000..48290af7 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/container-tag.yaml @@ -0,0 +1,23 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# The Jenkins job requires a tag to build the Docker image. +# By default this file is in the docker build directory, +# but the location can configured in the JJB template. +--- +tag: 0.0.1 \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/producer/pom.xml b/sample-services/ics-producer-consumer/producer/pom.xml new file mode 100644 index 00000000..d78f3709 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.2.3 + + + + com.demo.producer + producer + 0.0.1 + jar + + + 17 + 17 + + + + + + org.apache.kafka + kafka-clients + 3.7.0 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-test + test + + + org.projectlombok + lombok + 1.18.24 + provided + + + com.google.code.gson + gson + 2.10.1 + + + org.springframework.boot + spring-boot-starter + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.demo.producer.Application + + + + + diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java new file mode 100644 index 00000000..fc389f47 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java @@ -0,0 +1,56 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.EnableAsync; + +import com.demo.producer.messages.KafkaMessageHandlerImpl; +import com.demo.producer.producer.SimpleProducer; + +@SpringBootApplication +@EnableAsync +public class Application { + + @Autowired + private SimpleProducer simpleProducer; + + @Value("${vars.autostart:false}") + private boolean autostart; + + @Value("${vars.topic}") + private String topic; + public static void main(String[] args) throws Exception { + SpringApplication.run(Application.class, args); + } + + @EventListener(ApplicationReadyEvent.class) + public void checkAutoRun() throws Exception{ + if (autostart) { + simpleProducer.runAlways(topic, new KafkaMessageHandlerImpl()); + } + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java new file mode 100644 index 00000000..5bc68219 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java @@ -0,0 +1,137 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.controllers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.demo.producer.repository.InfoType; +import com.demo.producer.repository.InfoTypes; +import com.demo.producer.repository.Job.Parameters; +import com.demo.producer.repository.Jobs; +import com.demo.producer.dme.ProducerJobInfo; +import com.demo.producer.messages.KafkaMessageHandlerImpl; +import com.demo.producer.producer.SimpleProducer; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +@RestController +@RequestMapping(path = "/producer", produces = "application/json") +public class ProducerController { + private static final Logger log = LoggerFactory.getLogger(ProducerController.class); + + private static Gson gson = new GsonBuilder().create(); + + private final Jobs jobs; + private final InfoTypes types; + private String topicName = "mytopic"; + + + public ProducerController(@Autowired Jobs jobs, @Autowired InfoTypes types) { + this.jobs = jobs; + this.types = types; + InfoType type1 = InfoType.builder().build(); + type1.setId("type1"); + type1.setKafkaInputTopic(topicName); + type1.setInputJobType("type1"); + type1.setInputJobDefinition(null); + types.put(type1); + } + + @GetMapping("/publish/{numberOfMessages}") + public ResponseEntity publishMessage(@PathVariable int numberOfMessages) { + try { + new SimpleProducer().run(topicName, numberOfMessages, new KafkaMessageHandlerImpl()); + return ResponseEntity.ok("message published successfully .."); + } catch (Exception ex) { + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .build(); + } + } + + @PostMapping("/job/{infoJobId}") + public void jobCallback(@RequestBody String requestBody, @PathVariable String infoJobId) { + ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class); + try { + log.info("Adding producer job info " + request.toString()); + this.jobs.addJob(request.id, types.getType(request.typeId), request.owner, + toJobParameters(request.jobData)); + } catch (Exception e) { + log.error("Error adding producer job info: " + request.toString(), e.getMessage()); + } + } + + @PostMapping("/job") + public void jobCallbackNoId(@RequestBody String requestBody) { + ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class); + try { + log.info("Adding producer job info "+request.toString()); + this.jobs.addJob(request.id, types.getType(request.typeId), request.owner, + toJobParameters(request.jobData)); + } catch (Exception e) { + log.error("Error adding producer job info: " + request.toString(), e.getMessage()); + } + } + + private Parameters toJobParameters(Object jobData) { + String json = gson.toJson(jobData); + return gson.fromJson(json, Parameters.class); + } + + @GetMapping("/job") + public ResponseEntity getJobs() { + try { + log.info("Get all jobs"); + return new ResponseEntity<>(this.jobs.getAll().toString(), HttpStatus.OK); + } catch (Exception e) { + log.error("Error finding jobs", e.getMessage()); + return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR); + } + } + + @DeleteMapping("/job/{infoJobId}") + public ResponseEntity deleteJob(@PathVariable String infoJobId) { + try { + log.info("Delete Job" + infoJobId); + this.jobs.delete(infoJobId); + return new ResponseEntity<>("Deleted job:" + infoJobId, HttpStatus.OK); + } catch (Exception e) { + log.error("Error finding job " + infoJobId, e.getMessage()); + return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR); + } + } + + @GetMapping("/supervision") + public ResponseEntity getSupervision() { + log.info("Get Supervision"); + return new ResponseEntity<>("Ok", HttpStatus.OK); + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java new file mode 100644 index 00000000..e503b849 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java @@ -0,0 +1,93 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.controllers; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import com.demo.producer.messages.KafkaMessageHandlerImpl; +import com.demo.producer.messages.ApplicationMessageHandlerImpl; +import com.demo.producer.producer.SimpleProducer; + +@RestController +public class ThreadsController { + private static final Logger log = LoggerFactory.getLogger(ThreadsController.class); + + @Autowired + private SimpleProducer simpleProducer; + + private Thread producerThread; + + @Async + @GetMapping("/startProducer/{topicName}") + public CompletableFuture startProducer(@PathVariable("topicName") String topicName) { + try { + producerThread = new Thread(() -> { + try { + simpleProducer.runAlways(topicName, new ApplicationMessageHandlerImpl()); + } catch (Exception e) { + log.error("Error starting producer on: " + topicName, e.getMessage()); + } + }); + producerThread.start(); + return CompletableFuture.completedFuture("Producer started for topic: " + topicName); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + @GetMapping("/stopProducer") + public String stopProducer() { + if (producerThread != null && producerThread.isAlive()) { + try { + simpleProducer.shutdown(); + } catch (Exception e) { + log.error("Error stopping producer Thread", e.getMessage()); + } + return "Producer stopped successfully"; + } else { + return "No active producer to stop"; + } + } + + @GetMapping("/publish/{numberOfMessages}/on/{topicName}") + public CompletableFuture publishNMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) { + try { + producerThread = new Thread(() -> { + try { + simpleProducer.run(topicName, numberOfMessages, new KafkaMessageHandlerImpl()); + } catch (Exception e) { + log.error("Error producing " + numberOfMessages + "on " + topicName, e.getMessage()); + } + }); + producerThread.start(); + return CompletableFuture.completedFuture("Producer started for topic: " + topicName); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java new file mode 100644 index 00000000..5f87f4dd --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java @@ -0,0 +1,41 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.dme; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class ProducerInfoTypeInfo { + + @SerializedName("info_job_data_schema") + @JsonProperty(value = "info_job_data_schema", required = true) + public Object jobDataSchema; + + @SerializedName("info_type_information") + @JsonProperty(value = "info_type_information", required = true) + public Object typeSpecificInformation; + +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java new file mode 100644 index 00000000..c55d4c5e --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java @@ -0,0 +1,53 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.dme; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class ProducerJobInfo { + + @SerializedName("info_job_identity") + @JsonProperty("info_job_identity") + public String id = ""; + + @SerializedName("info_type_identity") + @JsonProperty("info_type_identity") + public String typeId = ""; + + @SerializedName("info_job_data") + @JsonProperty("info_job_data") + public Object jobData; + + @SerializedName("owner") + @JsonProperty("owner") + public String owner = ""; + + @SerializedName("last_updated") + @JsonProperty("last_updated") + public String lastUpdated = ""; + +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java new file mode 100644 index 00000000..5f279d24 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java @@ -0,0 +1,47 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.dme; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import java.util.Collection; + +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class ProducerRegistrationInfo { + + @SerializedName("supported_info_types") + @JsonProperty(value = "supported_info_types", required = true) + public Collection supportedTypeIds; + + @SerializedName("info_job_callback_url") + @JsonProperty(value = "info_job_callback_url", required = true) + public String jobCallbackUrl; + + @SerializedName("info_producer_supervision_callback_url") + @JsonProperty(value = "info_producer_supervision_callback_url", required = true) + public String producerSupervisionCallbackUrl; + +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java new file mode 100644 index 00000000..d00d0747 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java @@ -0,0 +1,43 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.messages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class AbstractSimpleKafka { + private static final Logger log = LoggerFactory.getLogger(AbstractSimpleKafka.class); + + public AbstractSimpleKafka(){ + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdown(); + } + }); + log.info(MessageHelper.getSimpleJSONObject("Created the Shutdown Hook").toJSONString()); + } + + public abstract void shutdown(); + public abstract void runAlways(String topicName, KafkaMessageHandler callback) throws Exception; + public abstract void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception; +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java new file mode 100644 index 00000000..beae7943 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java @@ -0,0 +1,39 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.messages; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ApplicationMessageHandlerImpl implements KafkaMessageHandler { + + private static final Logger log = LoggerFactory.getLogger(ApplicationMessageHandlerImpl.class); + + @Override + public void processMessage(String topicName, ConsumerRecord message) throws Exception { + String source = KafkaMessageHandlerImpl.class.getName(); + JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value()); + System.out.println(obj.toJSONString()); + log.info(obj.toJSONString()); + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java new file mode 100644 index 00000000..814c88c2 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java @@ -0,0 +1,28 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.messages; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +@FunctionalInterface +public interface KafkaMessageHandler { + void processMessage(String topicName, ConsumerRecord message) throws Exception; +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java new file mode 100644 index 00000000..06f70260 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java @@ -0,0 +1,37 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.messages; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.json.simple.JSONObject; + +public class KafkaMessageHandlerImpl implements KafkaMessageHandler { + private static final Logger log = LoggerFactory.getLogger(KafkaMessageHandlerImpl.class); + + @Override + public void processMessage(String topicName, ConsumerRecord message) throws Exception { + String source = KafkaMessageHandlerImpl.class.getName(); + JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value()); + log.info(obj.toJSONString()); + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java new file mode 100644 index 00000000..7a5e45b6 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java @@ -0,0 +1,71 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.messages; + +import java.util.Properties; +import java.util.Random; +import org.json.simple.JSONObject; + +public class MessageHelper { + + private static Properties props; + + public static String getRandomString() { + int leftLimit = 48; // numeral '0' + int rightLimit = 122; // letter 'z' + int targetStringLength = 10; + Random random = new Random(); + + return random.ints(leftLimit, rightLimit + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97)) + .limit(targetStringLength) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString(); + } + + @SuppressWarnings("unchecked") // Using only strings + public static JSONObject getMessageLogEntryJSON(String source, String topic, String key, String message) + throws Exception { + JSONObject obj = new JSONObject(); + String bootstrapServers = getProperties().getProperty("bootstrap.servers"); + obj.put("bootstrapServers", bootstrapServers); + obj.put("source", source); + obj.put("topic", topic); + obj.put("key", key); + obj.put("message", message); + + return obj; + } + + + @SuppressWarnings("unchecked") + public static JSONObject getSimpleJSONObject(String message){ + JSONObject obj = new JSONObject(); + obj.put("message", message); + return obj; + } + + protected static Properties getProperties() throws Exception { + if (props == null) { + props = PropertiesHelper.getProperties(); + } + return props; + } + +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java new file mode 100644 index 00000000..7dc2b1ef --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java @@ -0,0 +1,58 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.messages; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.demo.producer.producer.SimpleProducer; + +@Component +public class PropertiesHelper { + private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class); + + public static Properties getProperties() throws Exception { + Properties props = null; + try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) { + props = new Properties(); + if (input == null) { + log.error("Found no configuration file in resources"); + throw new Exception("Sorry, unable to find config.properties"); + } + props.load(input); + String kafkaServers = System.getenv("KAFKA_SERVERS"); + if (kafkaServers != null) { + props.setProperty("bootstrap.servers", kafkaServers); + log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers); + } else { + log.info("Env variable KAFKA_SERVERS not found, defaulting to config file"); + } + } catch (IOException e) { + log.error("Error reading configuration file: ", e.getMessage()); + } + return props; + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java new file mode 100644 index 00000000..6c9858b1 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java @@ -0,0 +1,112 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.producer.producer; + +import java.util.UUID; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.json.simple.JSONObject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import com.demo.producer.messages.AbstractSimpleKafka; +import com.demo.producer.messages.KafkaMessageHandler; +import com.demo.producer.messages.MessageHelper; +import com.demo.producer.messages.PropertiesHelper; + +import lombok.Getter; +import lombok.Setter; + +@Component +@Getter +@Setter +public class SimpleProducer extends AbstractSimpleKafka { + private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class); + + @Value("${vars.time:1000}") + private int TIME; + + private KafkaProducer kafkaProducer; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception { + for (int i = 0; i < numberOfMessages; i++) { + String key = UUID.randomUUID().toString(); + String message = MessageHelper.getRandomString(); + if (this.getTopicName() == null) { + this.setTopicName(topicName); + } + this.send(topicName, key, message); + Thread.sleep(TIME); + } + this.shutdown(); + } + + public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception { + while (true) { + String key = UUID.randomUUID().toString(); + String message = MessageHelper.getRandomString(); + this.send(topicName, key, message); + Thread.sleep(TIME); + } + } + + private String topicName = null; + + private void setTopicName(String topicName) { + this.topicName = topicName; + } + + private String getTopicName() { + return this.topicName; + } + + protected void send(String topicName, String key, String message) throws Exception { + String source = SimpleProducer.class.getName(); + ProducerRecord producerRecord = new ProducerRecord<>(topicName, key, message); + JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, key, message); + log.info(obj.toJSONString()); + getKafkaProducer().send(producerRecord); + } + + private KafkaProducer getKafkaProducer() throws Exception { + if (this.kafkaProducer == null) { + Properties props = PropertiesHelper.getProperties(); + this.kafkaProducer = new KafkaProducer<>(props); + } + return this.kafkaProducer; + } + + public void shutdown(){ + closed.set(true); + try { + log.info(MessageHelper.getSimpleJSONObject("Shutting down producer").toJSONString()); + getKafkaProducer().close(); + } catch (Exception e) { + log.error("Error shutting down the Producer ", e.getMessage()); + } + + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java new file mode 100644 index 00000000..9f9c0cfc --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java @@ -0,0 +1,49 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.repository; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@ToString +@Builder(access = AccessLevel.PUBLIC) +public class InfoType { + + @Getter + @Setter + private String id; + + @Getter + @Setter + private String kafkaInputTopic; + + @Getter + @Setter + private String inputJobType; + + @Getter + @Setter + private Object inputJobDefinition; + +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java new file mode 100644 index 00000000..b7a99144 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java @@ -0,0 +1,81 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.repository; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +@Component +public class InfoTypes { + private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class); + + private Map allTypes = new HashMap<>(); + + public InfoTypes(Collection types) { + for (InfoType type : types) { + put(type); + } + } + + public synchronized InfoType get(String id) { + return allTypes.get(id); + } + + public synchronized InfoType getType(String id) throws Exception { + InfoType type = allTypes.get(id); + if (type == null) { + throw new Exception("Could not find type: " + id + HttpStatus.NOT_FOUND.toString()); + } + return type; + } + + public static class ConfigFile { + Collection types; + } + + public synchronized void put(InfoType type) { + logger.debug("Put type: {}", type.getId()); + allTypes.put(type.getId(), type); + } + + public synchronized Iterable getAll() { + return new Vector<>(allTypes.values()); + } + + public synchronized Collection typeIds() { + return allTypes.keySet(); + } + + public synchronized int size() { + return allTypes.size(); + } + + public synchronized void clear() { + allTypes.clear(); + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java new file mode 100644 index 00000000..cafddde1 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java @@ -0,0 +1,71 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.repository; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +@ToString +public class Job { + @Builder + public static class Parameters { + + @Builder + @EqualsAndHashCode + public static class KafkaDeliveryInfo { + @Getter + private String topic; + + @Getter + private String bootStrapServers; + + @JsonProperty(value = "numberOfMessages") + @Getter + private int numberOfMessages; + } + + @Getter + private KafkaDeliveryInfo deliveryInfo; + } + + @Getter + private final String id; + + @Getter + private final InfoType type; + + @Getter + private final String owner; + + @Getter + private final Parameters parameters; + + public Job(String id, InfoType type, String owner, Parameters parameters) { + this.id = id; + this.type = type; + this.owner = owner; + this.parameters = parameters; + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java new file mode 100644 index 00000000..f9cffd8e --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java @@ -0,0 +1,83 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer.repository; + +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.demo.producer.repository.Job.Parameters; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +@Component +public class Jobs { + private static final Logger logger = LoggerFactory.getLogger(Jobs.class); + + private Map allJobs = new HashMap<>(); + + public Jobs() { + } + + public synchronized Job getJob(String id) throws Exception { + Job job = allJobs.get(id); + if (job == null) { + throw new Exception("Could not find job: " + id); + } + return job; + } + + public synchronized Job get(String id) { + return allJobs.get(id); + } + + public void addJob(String id, InfoType type, String owner, Parameters parameters) { + Job job = new Job(id, type, owner, parameters); + this.put(job); + } + + private synchronized void put(Job job) { + logger.debug("Put job: {}", job.getId()); + allJobs.put(job.getId(), job); + } + + public synchronized Iterable getAll() { + return new Vector<>(allJobs.values()); + } + + public synchronized int size() { + return allJobs.size(); + } + + public synchronized Job delete(String id) { + return allJobs.remove(id); + } + + @Override + public String toString() { + Gson gson = new GsonBuilder().create(); + return gson.toJson(allJobs); + } +} diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml new file mode 100644 index 00000000..31966dff --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml @@ -0,0 +1,28 @@ +# ============LICENSE_START=============================================== +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +# +vars: + time: 1000 + autostart: true + topic: mytopic #This topic is used only in autostart + +spring: + application: + name: demoProducer + +logging: + level: + root: INFO \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties b/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties new file mode 100644 index 00000000..83e55392 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties @@ -0,0 +1,44 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# The location of the Kafka server +bootstrap.servers=${KAFKA_SERVERS:localhost:9092} + +# the default group ID +group.id=test-group + +# the default topic to use if one is not provided +default.topic=magic-topic + +# The number of records to pull of the stream every time +# the client takes a trip out to Kafka +max.poll.records=10 + +# Make Kafka keep track of record reads by the consumer +enable.auto.commit=true + +# The time in milliseconds to Kafka write the offset of the last message read +auto.commit.interval.ms=500 + +# classes for serializing and deserializing messages +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +enable.idempotence=false diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml b/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml new file mode 100644 index 00000000..3dbfe399 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml @@ -0,0 +1,44 @@ + + + + logging.log + + logging.%i.log.zip + 1 + 10 + + + 10MB + + + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + + + + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + + + + + + diff --git a/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java new file mode 100644 index 00000000..da778739 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java @@ -0,0 +1,105 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer; + +import com.demo.producer.producer.SimpleProducer; +import org.apache.kafka.clients.producer.KafkaProducer; + +import com.demo.producer.messages.KafkaMessageHandler; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +class SimpleProducerTest { + + private final int wait = 1000; + private final String topicName = "testTopic"; + + @Mock + private KafkaProducer kafkaProducer; + + @InjectMocks + @Autowired + private SimpleProducer simpleProducer; + + private AutoCloseable closable; + + @BeforeEach + void setUp() { + closable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void close() throws Exception { + closable.close(); + } + + @Test + @SuppressWarnings("unchecked") //sending only Strings + void testRun() throws Exception { + int numberOfMessages = 10; + KafkaMessageHandler callback = mock(KafkaMessageHandler.class); + + simpleProducer.run(topicName, numberOfMessages, callback); + + verify(kafkaProducer, times(numberOfMessages)).send(any(ProducerRecord.class)); + verify(kafkaProducer, times(1)).close(); + } + + @Test + @SuppressWarnings("unchecked") //sending only Strings + void testRunAlways() throws Exception { + KafkaMessageHandler callback = mock(KafkaMessageHandler.class); + simpleProducer.setTIME(wait); + // Mocking behavior to break out of the loop after a few iterations + doAnswer(invocation -> { + simpleProducer.shutdown(); + return null; + }).when(kafkaProducer).send(any(ProducerRecord.class)); + + // Invoking runAlways() in a separate thread to avoid an infinite loop + Thread thread = new Thread(() -> { + try { + simpleProducer.runAlways(topicName, callback); + } catch (Exception e) { + } + }); + thread.start(); + + // Let the thread execute for some time (e.g., 1 second) + Thread.sleep(wait); + + // Interrupting the thread to stop the infinite loop + thread.interrupt(); + + verify(kafkaProducer, atLeastOnce()).send(any(ProducerRecord.class)); + verify(kafkaProducer, times(1)).close(); + } + +} diff --git a/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java new file mode 100644 index 00000000..d3587f10 --- /dev/null +++ b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java @@ -0,0 +1,89 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package com.demo.producer; + +import com.demo.producer.controllers.ThreadsController; +import com.demo.producer.producer.SimpleProducer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpServletResponse; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.*; + +@SpringBootTest +@ContextConfiguration(classes = SimpleProducer.class) +public class ThreadsControllerTest { + + @Mock + @Autowired + private SimpleProducer simpleProducer; + + @InjectMocks + private ThreadsController threadsController; + + private MockHttpServletRequest request; + private MockHttpServletResponse response; + + private AutoCloseable closable; + + @BeforeEach + public void setUp() { + closable = MockitoAnnotations.openMocks(this); + request = new MockHttpServletRequest(); + response = new MockHttpServletResponse(); + RequestContextHolder.setRequestAttributes(new ServletRequestAttributes(request, response)); + } + + @AfterEach + public void close() throws Exception { + closable.close(); + } + + @Test + public void testStopProducerWhenNoActiveProducer() throws Exception { + String result = threadsController.stopProducer(); + assertEquals("No active producer to stop", result); + verify(simpleProducer, never()).shutdown(); + } + + @Test + public void testPublishNMessage() throws Exception { + String topicName = "testTopic"; + int numberOfMessages = 10; + CompletableFuture future = CompletableFuture.completedFuture("Producer started for topic: " + topicName); + CompletableFuture result = threadsController.publishNMessage(numberOfMessages, topicName); + assertEquals(future.getClass(), result.getClass()); + } + +} diff --git a/sample-services/ics-producer-consumer/red.sh b/sample-services/ics-producer-consumer/red.sh new file mode 100644 index 00000000..12085fe4 --- /dev/null +++ b/sample-services/ics-producer-consumer/red.sh @@ -0,0 +1,189 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +#!/bin/bash +skip_build=false +no_console=false + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + case $1 in + --skip-build) + skip_build=true + shift + ;; + --no-console) + no_console=true + shift + ;; + *) + echo "Unknown option: $1" + exit 1 + ;; + esac +done +# Source the utils script +source utils.sh + +# Check Prerequisites +checkJava +checkDocker +checkDockerCompose + +if ! $skip_build; then + # Make build the demo docker image + cd ./producer/ + make build + cd ../consumer/ + make build + cd .. +fi + +# Start the Docker containers in detached mode +docker-compose up -d + +# Wait for the Kafka container to be running +wait_for_container "kafka-zkless" "Kafka Server started" +space + +if ! $no_console; then + echo "Start RedPanda Console" + docker-compose -f docker-composeRedPanda.yaml up -d + space + + echo "Start NONRTRIC control panel" + docker-compose -f ./docker-compose/docker-compose.yaml -f ./docker-compose/control-panel/docker-compose.yaml -f ./docker-compose/nonrtric-gateway/docker-compose.yaml up -d + space +fi + +# Once Kafka container is running, start the producers and consumers +echo "Kafka container is up and running. Starting producer and consumer..." +space + +echo "Start 1 Producer on mytopic" +curl -X GET http://localhost:8080/startProducer/mytopic +space + +echo "Start 1 Consumer on mytopic" +curl -X GET http://localhost:8081/startConsumer/mytopic +space + +sleep 10 + +echo "Sending type1 to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-producer/v1/info-types/type1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_job_data_schema": { + "$schema":"http://json-schema.org/draft-07/schema#", + "title":"STD_Type1_1.0.0", + "description":"Type 1", + "type":"object" + } +}' + +echo "Getting types from ICS" +curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1' +space + +echo "Sending Producer infos to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-producer/v1/info-producers/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision", + "supported_info_types": [ + "type1" + ], + "info_job_callback_url": "http://kafka-producer:8080/producer/job" +}' + +echo "Getting Producers Infos from ICS" +curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1' +space + +echo "Sending Consumer Job infos to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-consumer/v1/info-jobs/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_type_id": "type1", + "job_owner": "demo", + "job_definition": { + "deliveryInfo": { + "topic": "mytopic", + "bootStrapServers": "http://kafka-zkless:9092", + "numberOfMessages": 0 + } + }, + "job_result_uri": "http://kafka-producer:8080/producer/job", + "status_notification_uri": "http://kafka-producer:8080/producer/supervision" +}' + +echo "Getting Consumer Job Infos from ICS" +curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1' +space + +echo "Sending Consumer Subscription Job infos to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "status_result_uri": "http://kafka-consumer:8081/info-type-status", + "owner": "owner" +}' +echo "Getting Consumer Subscription Job infos from ICS" +curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json' +space + +sleep 5 +echo "ICS Producer Docker logs " +docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions' +space +echo "Demo Producer Docker logs " +docker logs kafka-producer | grep c.d.p.p.SimpleProducer +space +echo "Demo Consumer Docker logs " +docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer +space + +if ! $no_console; then + echo "Red Panda Console: http://localhost:8888" + echo "Control Panel Console: http://localhost:8181" +fi + +echo "Done." + +containers=("kafka-producer" "kafka-consumer") + +for container in "${containers[@]}"; do + if docker logs "$container" | grep -q ERROR; then + echo "Errors found in logs of $container" + echo "FAIL" + exit 1 + else + echo "No errors found in logs of $container" + fi +done +echo "SUCCESS" +exit 0 diff --git a/sample-services/ics-producer-consumer/runconsumer.sh b/sample-services/ics-producer-consumer/runconsumer.sh new file mode 100644 index 00000000..9ef776a8 --- /dev/null +++ b/sample-services/ics-producer-consumer/runconsumer.sh @@ -0,0 +1,2 @@ +#!/bin/bash +curl -X GET http://localhost:8081/startConsumer/mytopic \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/runproducer.sh b/sample-services/ics-producer-consumer/runproducer.sh new file mode 100644 index 00000000..f9f5a7e9 --- /dev/null +++ b/sample-services/ics-producer-consumer/runproducer.sh @@ -0,0 +1,2 @@ +#!/bin/bash +curl -X GET http://localhost:8080/startProducer/mytopic \ No newline at end of file diff --git a/sample-services/ics-producer-consumer/start.sh b/sample-services/ics-producer-consumer/start.sh new file mode 100755 index 00000000..18317a04 --- /dev/null +++ b/sample-services/ics-producer-consumer/start.sh @@ -0,0 +1,180 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +#!/bin/bash +source utils.sh + +PREFIX="nexus3.o-ran-sc.org:10004" +VERSION="0.0.1" + +# Create a network for Kafka Containers +docker network create kafka-net + +# Start Kafka +docker run -d \ + --network kafka-net \ + --name kafka-zkless \ + -p 9092:9092 \ + -e LOG_DIR="/tmp/logs" \ + -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" \ + -e KAFKA_LISTENERS="PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093" \ + -e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092" \ + quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 \ + /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && \ + bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && \ + bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$KAFKA_ADVERTISED_LISTENERS --override listener.security.protocol.map=$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP --override listeners=$KAFKA_LISTENERS' + +# Start ICS +docker run -d \ + --network kafka-net \ + --name informationcoordinatorservice \ + -p 8083:8083 \ + -v ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml \ + nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0 + +# Start Producer +docker run -d \ + --network kafka-net \ + --name kafka-producer \ + -p 8080:8080 \ + -e KAFKA_SERVERS=kafka-zkless:9092 \ + $PREFIX/o-ran-sc/nonrtric-sample-icsproducer:$VERSION + +#Start Consumer +docker run -d \ + --network kafka-net \ + --name kafka-consumer \ + -p 8081:8081 \ + -e KAFKA_SERVERS=kafka-zkless:9092 \ + $PREFIX/o-ran-sc/nonrtric-sample-icsconsumer:$VERSION + +# Wait for the Kafka container to be running +wait_for_container "kafka-zkless" "Kafka Server started" +wait_for_container "kafka-producer" "Started Application" +wait_for_container "kafka-consumer" "Started Application" + +# Once Kafka container is running, start the producers and consumers +echo "Kafka container is up and running. Starting producer and consumer..." +space + +echo "Start 1 Producer on mytopic" +curl -X GET http://localhost:8080/startProducer/mytopic +space + +echo "Start 1 Consumer on mytopic" +curl -X GET http://localhost:8081/startConsumer/mytopic +space + +sleep 10 + +echo "Sending type1 to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-producer/v1/info-types/type1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_job_data_schema": { + "$schema":"http://json-schema.org/draft-07/schema#", + "title":"STD_Type1_1.0.0", + "description":"Type 1", + "type":"object" + } +}' + +echo "Getting types from ICS" +curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1' +space + +echo "Sending Producer infos to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-producer/v1/info-producers/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision", + "supported_info_types": [ + "type1" + ], + "info_job_callback_url": "http://kafka-producer:8080/producer/job" +}' + +echo "Getting Producers Infos from ICS" +curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1' +space + +echo "Sending Consumer Job infos to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-consumer/v1/info-jobs/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "info_type_id": "type1", + "job_owner": "demo", + "job_definition": { + "deliveryInfo": { + "topic": "mytopic", + "bootStrapServers": "http://kafka-zkless:9092", + "numberOfMessages": 0 + } + }, + "job_result_uri": "http://kafka-producer:8080/producer/job", + "status_notification_uri": "http://kafka-producer:8080/producer/supervision" +}' + +echo "Getting Consumer Job Infos from ICS" +curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1' +space + +echo "Sending Consumer Subscription Job infos to ICS" +curl -X 'PUT' \ + 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "status_result_uri": "http://kafka-consumer:8081/info-type-status", + "owner": "owner" +}' +echo "Getting Consumer Subscription Job infos from ICS" +curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json' +space + +sleep 5 +echo "ICS Producer Docker logs " +docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions' +space +echo "Demo Producer Docker logs " +docker logs kafka-producer | grep c.d.p.p.SimpleProducer +space +echo "Demo Consumer Docker logs " +docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer +space + +echo "Done." + +containers=("kafka-producer" "kafka-consumer") + +for container in "${containers[@]}"; do + if docker logs "$container" | grep -q ERROR; then + echo "Errors found in logs of $container" + echo "FAIL" + exit 1 + else + echo "No errors found in logs of $container" + fi +done +echo "SUCCESS" +exit 0 diff --git a/sample-services/ics-producer-consumer/stop.sh b/sample-services/ics-producer-consumer/stop.sh new file mode 100644 index 00000000..60fba311 --- /dev/null +++ b/sample-services/ics-producer-consumer/stop.sh @@ -0,0 +1,27 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +#!/bin/bash +docker-compose down + +docker-compose -f docker-composeRedPanda.yaml down + +docker-compose -f ./docker-compose/docker-compose.yaml \ + -f ./docker-compose/control-panel/docker-compose.yaml \ + -f ./docker-compose/nonrtric-gateway/docker-compose.yaml \ + down diff --git a/sample-services/ics-producer-consumer/utils.sh b/sample-services/ics-producer-consumer/utils.sh new file mode 100644 index 00000000..68d19ecf --- /dev/null +++ b/sample-services/ics-producer-consumer/utils.sh @@ -0,0 +1,98 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +#!/bin/bash + +checkJava() { + if ! command -v java >/dev/null 2>&1; then + echo "Java is not installed. Please install Java." + echo "Suggested fix for ubuntu:" + echo "sudo apt install default-jdk" + exit 1 + else + echo "Java is installed." + fi +} + +checkMaven() { + if mvn -v >/dev/null 2>&1; then + echo "Maven is installed." + else + echo "Maven is not installed. Please install Maven." + echo "Suggested fix for ubuntu:" + echo "sudo apt install maven" + exit 1 + fi +} + +checkDocker() { + if ! docker -v > /dev/null 2>&1; then + echo "Docker is not installed. Please install Docker." + echo "Suggested fix for ubuntu:" + echo "sudo apt-get update" + echo "sudo apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release" + echo "curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg" + echo "echo \"deb [arch=\$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \$(lsb_release -cs) stable\" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null" + echo "sudo apt-get update" + echo "sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin" + echo "sudo usermod -aG docker \$USER" + echo "newgrp docker" + exit 1 + else + echo "Docker is installed." + fi +} + +checkDockerCompose() { + if ! docker-compose -v > /dev/null 2>&1; then + echo "docker-compose is not installed. Please install docker-compose" + echo "Suggested fix for ubuntu:" + echo "sudo apt-get install docker-compose-plugin" + exit 1 + else + echo "docker-compose is installed." + fi +} + +# Function to wait for a Docker container to be running and log a specific string +wait_for_container() { + local container_name="$1" + local log_string="$2" + + while ! docker inspect "$container_name" &>/dev/null; do + echo "Waiting for container '$container_name' to be created..." + sleep 5 + done + + while [ "$(docker inspect -f '{{.State.Status}}' "$container_name")" != "running" ]; do + echo "Waiting for container '$container_name' to be running..." + sleep 5 + done + + # Check container logs for the specified string + while ! docker logs "$container_name" 2>&1 | grep "$log_string"; do + echo "Waiting for '$log_string' in container logs of '$container_name'..." + sleep 5 + done +} + +space() { + echo "" + echo "++++++++++++++++++++++++++++++++++++++++++++++++++++" + echo "" +} -- 2.16.6