ICS sample producer and consumer 76/12676/17
authorlapentafd <francesco.lapenta@est.tech>
Fri, 5 Apr 2024 08:49:44 +0000 (09:49 +0100)
committerlapentafd <francesco.lapenta@est.tech>
Thu, 25 Apr 2024 10:17:14 +0000 (11:17 +0100)
Sample Java producer and consumer that integrates with ICS callbacks.

Issue-ID: NONRTRIC-965
Change-Id: I7319b46802444af130a3bd0d5c6bdd12f97c9904
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
69 files changed:
sample-services/ics-producer-consumer/.gitignore [new file with mode: 0644]
sample-services/ics-producer-consumer/README.md [new file with mode: 0644]
sample-services/ics-producer-consumer/application.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/Dockerfile [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/Makefile [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/container-tag.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/pom.xml [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml [new file with mode: 0644]
sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/.env [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/LICENSE.txt [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/README.md [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/docker-composeRedPanda.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/Dockerfile [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/Makefile [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/container-tag.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/pom.xml [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/resources/config.properties [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java [new file with mode: 0644]
sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java [new file with mode: 0644]
sample-services/ics-producer-consumer/red.sh [new file with mode: 0644]
sample-services/ics-producer-consumer/runconsumer.sh [new file with mode: 0644]
sample-services/ics-producer-consumer/runproducer.sh [new file with mode: 0644]
sample-services/ics-producer-consumer/start.sh [new file with mode: 0755]
sample-services/ics-producer-consumer/stop.sh [new file with mode: 0644]
sample-services/ics-producer-consumer/utils.sh [new file with mode: 0644]

diff --git a/sample-services/ics-producer-consumer/.gitignore b/sample-services/ics-producer-consumer/.gitignore
new file mode 100644 (file)
index 0000000..851f236
--- /dev/null
@@ -0,0 +1,26 @@
+.idea
+.DS_Store
+target
+logging.log
+
+*.class
+logs/
+.vscode
+*.log
+
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
diff --git a/sample-services/ics-producer-consumer/README.md b/sample-services/ics-producer-consumer/README.md
new file mode 100644 (file)
index 0000000..1731c17
--- /dev/null
@@ -0,0 +1,102 @@
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+# Automatic
+### Using Kafka with a Java Producer and Consumer
+
+Run the demo script
+It will check prerequisites, build a consumer and producer, and run them with kafka and ICS.
+
+```shell
+./start.sh
+```
+Or run the other script to bring up RedPanda on port 8888 and NONRTRIC control panel UI on port 8181
+
+```shell
+./red.sh
+```
+
+For a faster execution you can add:
+
+--skip-build to skip creating the app jar and building the docker images
+
+--no-console to skip running RedPanda and NONRTRIC control panel
+
+```shell
+./red.sh --skip-build --no-console
+```
+# Manual
+### Run Kafka in a container
+
+```shell
+docker-compose up -d kafka-zkless
+```
+
+### Starting the REST application individually
+
+In a new terminal window:
+
+```shell
+mvn spring-boot:run
+```
+
+### Starting a producer
+
+```shell
+sh ./runproducer.sh
+```
+
+### Starting a Consumer
+
+```shell
+sh ./runproducer.sh
+```
+
+## Reading the logs
+
+A sample of the output is as follows:
+
+```
+Demo Producer Docker logs
+
+2024-04-02 12:48:05 INFO  c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"ygHwxXSIxW","key":"f8f1a7a7-a78e-4c7d-9b8d-108bb0cc9e2c"}
+2024-04-02 12:48:06 INFO  c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"KNIbP10zfN","key":"b058d00f-bbcd-4d2c-936b-6327847d4c2a"}
+2024-04-02 12:48:07 INFO  c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"V6fH1NkdeH","key":"ae1a83a3-d8a7-40c8-9d98-529230f8b585"}
+2024-04-02 12:48:08 INFO  c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"m76qvRFh6f","key":"abccde52-fa72-4fd4-99ab-5bc21514d825"}
+2024-04-02 12:48:09 INFO  c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"t7FJYnFr43","key":"0602239e-34e9-45a6-a04a-3c67b4c7d9e4"}
+
+++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+Demo Consumer Docker logs
+
+2024-04-02 12:48:05 INFO  c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: ygHwxXSIxW"}
+2024-04-02 12:48:06 INFO  c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: KNIbP10zfN"}
+2024-04-02 12:48:07 INFO  c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: V6fH1NkdeH"}
+2024-04-02 12:48:08 INFO  c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: m76qvRFh6f"}
+2024-04-02 12:48:09 INFO  c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: t7FJYnFr43"}
+
+++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+ICS logs
+
+2024-04-02T12:48:05.615Z DEBUG 1 --- [or-http-epoll-2] o.o.i.c.r1producer.ProducerCallbacks     : Job subscription 1 started OK 1
+2024-04-02T12:48:05.820Z DEBUG 1 --- [io-8083-exec-10] o.o.i.repository.InfoTypeSubscriptions   : Added type status subscription 1
+```
+
+The script will fail (exit 1) if there are anny ERRORS logged in the kafka-producer and kafka-consumer.
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/application.yaml b/sample-services/ics-producer-consumer/application.yaml
new file mode 100644 (file)
index 0000000..cfb4f78
--- /dev/null
@@ -0,0 +1,83 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+spring:
+  profiles:
+    active: prod
+  main:
+    allow-bean-definition-overriding: true
+  aop:
+    auto: false
+springdoc:
+  show-actuator: true
+management:
+  endpoints:
+    web:
+      exposure:
+        # Enabling of springboot actuator features. See springboot documentation.
+        include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown"
+  endpoint:
+    shutdown:
+      enabled: true
+lifecycle:
+  timeout-per-shutdown-phase: "20s"
+logging:
+  # Configuration of logging
+  level:
+    ROOT: INFO
+    org.springframework: INFO
+    org.springframework.data: INFO
+    org.springframework.web.reactive.function.client.ExchangeFunctions: INFO
+    org.oransc.ics: DEBUG
+  file:
+    name: /var/log/information-coordinator-service/application.log
+server:
+   # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
+   # See springboot documentation.
+   port : 8434
+   http-port: 8083
+   ssl:
+      key-store-type: JKS
+      key-store-password: policy_agent
+      key-store: /opt/app/information-coordinator-service/etc/cert/keystore.jks
+      key-password: policy_agent
+      key-alias: policy_agent
+   shutdown: "graceful"
+app:
+  webclient:
+    # Configuration of the trust store used for the HTTP client (outgoing requests)
+    # The file location and the password for the truststore is only relevant if trust-store-used == true
+    # Note that the same keystore as for the server is used.
+    trust-store-used: false
+    trust-store-password: policy_agent
+    trust-store: /opt/app/information-coordinator-service/etc/cert/truststore.jks
+    # Configuration of usage of HTTP Proxy for the southbound accesses.
+    # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
+    http.proxy-host:
+    http.proxy-port: 0
+  vardata-directory: /var/information-coordinator-service
+  # If the file name is empty, no authorization token is used
+  auth-token-file:
+  # A URL to authorization provider such as OPA. Each time a information job is accessed, a call to this
+  # authorization provider is done for access control. If this is empty, no fine grained access control is done.
+  info-job-authorization-agent:
+  # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter.
+  s3:
+    endpointOverride: http://localhost:9000
+    accessKeyId: minio
+    secretAccessKey: miniostorage
+    bucket:
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/consumer/Dockerfile b/sample-services/ics-producer-consumer/consumer/Dockerfile
new file mode 100644 (file)
index 0000000..d5d6f2a
--- /dev/null
@@ -0,0 +1,49 @@
+#==================================================================================
+#   Copyright (C) 2024: OpenInfra Foundation Europe
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+#   platform project (RICP).
+#==================================================================================
+
+# Use Maven image with OpenJDK 17 for the build stage
+FROM maven:3.8.5-openjdk-17 AS maven_build
+
+# Copy Maven project files
+COPY pom.xml /tmp/
+COPY src /tmp/src/
+
+# Set working directory
+WORKDIR /tmp/
+
+# Build the Maven project
+RUN mvn package
+
+# Use a separate image with OpenJDK 17 for the runtime stage
+FROM openjdk:17-jdk-slim
+
+# Expose port 8081
+EXPOSE 8081
+
+ARG KAFKA_SERVERS
+ENV KAFKA_SERVERS=${KAFKA_SERVERS}
+
+# Set the working directory
+WORKDIR /app
+
+# Copy the JAR file from the maven_build stage to the runtime stage
+COPY --from=maven_build /tmp/target/consumer-0.0.1.jar /app/consumer-0.0.1.jar
+
+# Command to run the application
+CMD ["java", "-jar", "consumer-0.0.1.jar"]
diff --git a/sample-services/ics-producer-consumer/consumer/Makefile b/sample-services/ics-producer-consumer/consumer/Makefile
new file mode 100644 (file)
index 0000000..4e87cd3
--- /dev/null
@@ -0,0 +1,51 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+# Define variables
+IMAGE_NAME = "o-ran-sc/nonrtric-sample-icsconsumer"
+DOCKERFILE = Dockerfile
+
+# Default target
+.PHONY: all
+all: build run
+
+# Target to build the Maven JAR
+.PHONY: jar
+jar:
+       mvn clean package
+
+# Target to build the Docker image
+.PHONY: build
+build:
+       docker build -t $(IMAGE_NAME) -f $(DOCKERFILE) .
+
+# Target to run the Docker container
+.PHONY: run
+run:
+       docker run -p 8081:8081 $(IMAGE_NAME)
+
+# Target to stop and remove the Docker container
+.PHONY: stop
+stop:
+       docker stop $(IMAGE_NAME) || true
+       docker rm $(IMAGE_NAME) || true
+
+# Target to clean up
+.PHONY: clean
+clean: stop
+       docker rmi $(IMAGE_NAME) || true
diff --git a/sample-services/ics-producer-consumer/consumer/container-tag.yaml b/sample-services/ics-producer-consumer/consumer/container-tag.yaml
new file mode 100644 (file)
index 0000000..48290af
--- /dev/null
@@ -0,0 +1,23 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+\r
+# The Jenkins job requires a tag to build the Docker image.\r
+# By default this file is in the docker build directory,\r
+# but the location can configured in the JJB template.\r
+---\r
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/consumer/pom.xml b/sample-services/ics-producer-consumer/consumer/pom.xml
new file mode 100644 (file)
index 0000000..84a6ae8
--- /dev/null
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>3.2.3</version>
+        <relativePath/>
+    </parent>
+
+    <groupId>com.demo.consumer</groupId>
+    <artifactId>consumer</artifactId>
+    <version>0.0.1</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.7.0</version>
+            <!--<scope>provided</scope> -->
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+           <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.24</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.10.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <mainClass>com.demo.consumer.Application</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java
new file mode 100644 (file)
index 0000000..0ba278b
--- /dev/null
@@ -0,0 +1,57 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+import com.demo.consumer.messages.KafkaMessageHandlerImpl;
+
+@SpringBootApplication
+@EnableAsync
+public class Application {
+
+    @Autowired
+    private SimpleConsumer simpleConsumer;
+
+    @Value("${vars.autostart:false}")
+    private boolean autostart;
+
+    @Value("${vars.topic}")
+    private String topic;
+
+    public static void main(String[] args) throws Exception {
+        SpringApplication.run(Application.class, args);
+    }
+
+    @EventListener(ApplicationReadyEvent.class)
+    public void checkAutoRun() throws Exception{
+        if (autostart) {
+            simpleConsumer.runAlways(topic, new KafkaMessageHandlerImpl());
+        }
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java
new file mode 100644 (file)
index 0000000..430ab59
--- /dev/null
@@ -0,0 +1,144 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.demo.consumer.messages.AbstractSimpleKafka;
+import com.demo.consumer.messages.KafkaMessageHandler;
+import com.demo.consumer.messages.MessageHelper;
+import com.demo.consumer.messages.PropertiesHelper;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Properties;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Component
+@Setter
+@Getter
+public class SimpleConsumer extends AbstractSimpleKafka {
+    private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
+
+    private KafkaConsumer<String, String> kafkaConsumer = null;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    @Value("${vars.time:2000}")
+    private int TIME_OUT_MS;
+
+    public void run(String topicName, int numberOfRecords, KafkaMessageHandler callback) throws Exception {
+        Properties props = PropertiesHelper.getProperties();
+        // See if the number of records is provided
+        Optional<Integer> recs = Optional.ofNullable(numberOfRecords);
+
+        // adjust the number of records to get accordingly
+        Integer numOfRecs = recs.orElseGet(() -> Integer.parseInt(props.getProperty("max.poll.records")));
+        props.setProperty("max.poll.records", String.valueOf(numOfRecs));
+
+        // create the consumer
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+
+        // make the consumer available for graceful shutdown
+        setKafkaConsumer(consumer);
+        consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
+        //consumer.seekToBeginning(consumer.assignment()); //--from-beginning
+        int recNum = numOfRecs;
+        while (recNum > 0) {
+            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
+            recNum = records.count();
+            if (recNum == 0) {
+                log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
+                break;
+            }
+
+            for (ConsumerRecord<String, String> record : records) {
+                callback.processMessage(topicName, record);
+                recNum--;
+            }
+        }
+        consumer.close();
+    }
+
+    private void close() throws Exception {
+        if (this.getKafkaConsumer() == null) {
+            log.info(MessageHelper.getSimpleJSONObject("The internal consumer is NULL").toJSONString());
+            return;
+        }
+        log.info(MessageHelper.getSimpleJSONObject("Closing consumer").toJSONString());
+        this.getKafkaConsumer().close();
+    }
+
+    public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
+        Properties props = PropertiesHelper.getProperties();
+        // make the consumer available for graceful shutdown
+        setKafkaConsumer(new KafkaConsumer<>(props));
+
+        // keep running forever or until shutdown() is called from another thread.
+        try {
+            getKafkaConsumer().subscribe(List.of(topicName));
+            while (!closed.get()) {
+                ConsumerRecords<String, String> records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS));
+                if (records.count() == 0) {
+                    log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
+                }
+
+                for (ConsumerRecord<String, String> record : records) {
+                    callback.processMessage(topicName, record);
+                    log.info(MessageHelper.getSimpleJSONObject("Topic: " + topicName + "Message: " + record.value()).toJSONString());
+                }
+            }
+        } catch (WakeupException e) {
+            // Ignore exception if closing
+            if (!closed.get())
+                throw e;
+        }
+    }
+
+    public void shutdown() {
+        closed.set(true);
+        try {
+            log.info(MessageHelper.getSimpleJSONObject("Shutting down consumer").toJSONString());
+        } catch (Exception e) {
+        }
+        getKafkaConsumer().wakeup();
+    }
+
+    public KafkaConsumer<String, String> getKafkaConsumer() {
+        return kafkaConsumer;
+    }
+
+    public void setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
+        this.kafkaConsumer = kafkaConsumer;
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java
new file mode 100644 (file)
index 0000000..63cc215
--- /dev/null
@@ -0,0 +1,85 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.controllers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.demo.consumer.repository.InfoType;
+import com.demo.consumer.repository.InfoTypes;
+import com.demo.consumer.repository.Job.Parameters;
+import com.demo.consumer.dme.ConsumerJobInfo;
+import com.demo.consumer.dme.ConsumerStatusInfo;
+import com.demo.consumer.repository.Jobs;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@RestController
+@RequestMapping(path = "/consumer", produces = "application/json")
+public class ConsumerController {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerController.class);
+
+    private static Gson gson = new GsonBuilder().create();
+
+    private final Jobs jobs;
+    private final InfoTypes types;
+
+    public ConsumerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
+        this.jobs = jobs;
+        this.types = types;
+        InfoType type1 = InfoType.builder().build();
+        Parameters p = Parameters.builder().build();
+        type1.setId("type1");
+        type1.setKafkaInputTopic("mytopic");
+        type1.setInputJobType("type1");
+        type1.setInputJobDefinition(p);
+        types.put(type1);
+    }
+
+    @PostMapping("/job/{infoJobId}")
+    public void startinfojob(@RequestBody String requestBody, @PathVariable String infoJobId) {
+        ConsumerJobInfo request = gson.fromJson(requestBody, ConsumerJobInfo.class);
+        log.info("Add Job Info" + infoJobId, request);
+        try {
+            this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner,
+                    toJobParameters(request.jobDefinition));
+        } catch (Exception e) {
+            log.error("Error adding the job" + infoJobId, e.getMessage());
+        }
+    }
+
+    @PostMapping("/info-type-status")
+    public void statusChange(@RequestBody String requestBody) {
+        ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class);
+        log.info("Add Status Job Info", request);
+    }
+
+    private Parameters toJobParameters(Object jobData) {
+        String json = gson.toJson(jobData);
+        return gson.fromJson(json, Parameters.class);
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java
new file mode 100644 (file)
index 0000000..f668e3a
--- /dev/null
@@ -0,0 +1,93 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.controllers;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+import com.demo.consumer.messages.ApplicationMessageHandlerImpl;
+
+@RestController
+public class ThreadsController {
+    private static final Logger log = LoggerFactory.getLogger(ThreadsController.class);
+
+    @Autowired
+    private SimpleConsumer simpleConsumer;
+
+    private Thread consumerThread;
+
+    @Async
+    @GetMapping("/startConsumer/{topicName}")
+    public CompletableFuture<String> startConsumer(@PathVariable("topicName") String topicName) {
+        try {
+            Thread consumerThread = new Thread(() -> {
+                try {
+                    simpleConsumer.runAlways(topicName, new ApplicationMessageHandlerImpl());
+                } catch (Exception e) {
+                    log.error("Error starting consuming on: " + topicName, e.getMessage());
+                }
+            });
+            consumerThread.start();
+            return CompletableFuture.completedFuture("Consumer started for topic: " + topicName);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    @GetMapping("/stopConsumer")
+    public String stopConsumer() {
+        if (consumerThread != null && consumerThread.isAlive()) {
+            try {
+                simpleConsumer.shutdown();
+            } catch (Exception e) {
+                log.error("Error stopping consumer Thread", e.getMessage());
+            }
+            return "Consumer stopped successfully";
+        } else {
+            return "No active consumer to stop";
+        }
+    }
+
+    @GetMapping("/listen/{numberOfMessages}/on/{topicName}")
+    public CompletableFuture<String> listenMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) {
+        try {
+            Thread consumerThread = new Thread(() -> {
+                try {
+                    simpleConsumer.run(topicName, numberOfMessages, new ApplicationMessageHandlerImpl());
+                } catch (Exception e) {
+                    log.error("Error starting consuming on: " + topicName, e.getMessage());
+                }
+            });
+            consumerThread.start();
+            return CompletableFuture.completedFuture("Consumer started for topic: " + topicName);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java
new file mode 100644 (file)
index 0000000..b3cd416
--- /dev/null
@@ -0,0 +1,52 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConsumerJobInfo {
+
+    @SerializedName("info_type_id")
+    @JsonProperty(value = "info_type_id", required = true)
+    public String infoTypeId = "";
+
+    @SerializedName("job_owner")
+    @JsonProperty(value = "job_owner", required = true)
+    public String owner = "";
+
+    @SerializedName("job_definition")
+    @JsonProperty(value = "job_definition", required = true)
+    public Object jobDefinition;
+
+    @SerializedName("job_result_uri")
+    @JsonProperty(value = "job_result_uri", required = true)
+    public String jobResultUri = "";
+
+    @SerializedName("status_notification_uri")
+    @JsonProperty(value = "status_notification_uri", required = false)
+    public String statusNotificationUri = "";
+
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java
new file mode 100644 (file)
index 0000000..46f1a3f
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.dme;
+
+import java.util.Collection;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConsumerStatusInfo {
+
+    public enum InfoJobStatusValues {
+        REGISTERED, UNREGISTERED
+    }
+
+    @SerializedName("info_job_status")
+    @JsonProperty(value = "info_job_status", required = true)
+    public InfoJobStatusValues state;
+
+    @SerializedName("producers")
+    @JsonProperty(value = "producers", required = true)
+    public Collection<String> producers;
+
+}
+
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java
new file mode 100644 (file)
index 0000000..4f817a9
--- /dev/null
@@ -0,0 +1,42 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractSimpleKafka {
+    private static final Logger log = LoggerFactory.getLogger(AbstractSimpleKafka.class);
+
+    public AbstractSimpleKafka(){
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                    shutdown();
+            }
+        });
+        log.info(MessageHelper.getSimpleJSONObject("Created the Shutdown Hook").toJSONString());
+    }
+
+    public abstract void shutdown();
+    public abstract void runAlways(String topicName, KafkaMessageHandler callback) throws Exception;
+    public abstract void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java
new file mode 100644 (file)
index 0000000..ed4a930
--- /dev/null
@@ -0,0 +1,39 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ApplicationMessageHandlerImpl implements KafkaMessageHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(ApplicationMessageHandlerImpl.class);
+
+    @Override
+    public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+        String source = KafkaMessageHandlerImpl.class.getName();
+        JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+        System.out.println(obj.toJSONString());
+        log.info(obj.toJSONString());
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java
new file mode 100644 (file)
index 0000000..9c16f9e
--- /dev/null
@@ -0,0 +1,28 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+@FunctionalInterface
+public interface KafkaMessageHandler {
+    void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java
new file mode 100644 (file)
index 0000000..8ff36fe
--- /dev/null
@@ -0,0 +1,37 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.json.simple.JSONObject;
+
+public class KafkaMessageHandlerImpl implements KafkaMessageHandler {
+    private static final Logger log = LoggerFactory.getLogger(KafkaMessageHandlerImpl.class);
+
+    @Override
+    public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+        String source = KafkaMessageHandlerImpl.class.getName();
+        JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+        log.info(obj.toJSONString());
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java
new file mode 100644 (file)
index 0000000..21063cc
--- /dev/null
@@ -0,0 +1,73 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import java.util.Properties;
+import java.util.Random;
+import org.json.simple.JSONObject;
+
+/**
+ * The type Message helper.
+ */
+public class MessageHelper {
+
+    private static Properties props;
+
+    public static String getRandomString() {
+        int leftLimit = 48; // numeral '0'
+        int rightLimit = 122; // letter 'z'
+        int targetStringLength = 10;
+        Random random = new Random();
+
+        return random.ints(leftLimit, rightLimit + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
+                .limit(targetStringLength)
+                .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
+    }
+
+    @SuppressWarnings("unchecked") // Using only strings
+    public static JSONObject getMessageLogEntryJSON(String source, String topic, String key, String message)
+            throws Exception {
+        JSONObject obj = new JSONObject();
+        String bootstrapServers = getProperties().getProperty("bootstrap.servers");
+        obj.put("bootstrapServers", bootstrapServers);
+        obj.put("source", source);
+        obj.put("topic", topic);
+        obj.put("key", key);
+        obj.put("message", message);
+
+        return obj;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static JSONObject getSimpleJSONObject(String message) {
+        JSONObject obj = new JSONObject();
+        obj.put("message", message);
+        return obj;
+    }
+
+    protected static Properties getProperties() throws Exception {
+        if (props == null) {
+            props = PropertiesHelper.getProperties();
+        }
+        return props;
+    }
+
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java
new file mode 100644 (file)
index 0000000..18be2f8
--- /dev/null
@@ -0,0 +1,58 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+
+@Component
+public class PropertiesHelper {
+    private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+
+    public static Properties getProperties() throws Exception {
+        Properties props = null;
+        try (InputStream input = SimpleConsumer.class.getClassLoader().getResourceAsStream("config.properties")) {
+            props = new Properties();
+            if (input == null) {
+                log.error("Found no configuration file in resources");
+                throw new Exception("Sorry, unable to find config.properties");
+            }
+            props.load(input);
+            String kafkaServers = System.getenv("KAFKA_SERVERS");
+            if (kafkaServers != null) {
+                props.setProperty("bootstrap.servers", kafkaServers);
+                log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+            } else {
+                log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+            }
+        } catch (IOException e) {
+            log.error("Error reading configuration file: ", e.getMessage());
+        }
+        return props;
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java
new file mode 100644 (file)
index 0000000..45b4c7c
--- /dev/null
@@ -0,0 +1,49 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@ToString
+@Builder(access = AccessLevel.PUBLIC)
+public class InfoType {
+
+    @Getter
+    @Setter
+    private String id;
+
+    @Getter
+    @Setter
+    private String kafkaInputTopic;
+
+    @Getter
+    @Setter
+    private String inputJobType;
+
+    @Getter
+    @Setter
+    private Object inputJobDefinition;
+
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java
new file mode 100644 (file)
index 0000000..f1e83be
--- /dev/null
@@ -0,0 +1,81 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+
+@Component
+public class InfoTypes {
+    private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class);
+
+    private Map<String, InfoType> allTypes = new HashMap<>();
+
+    public InfoTypes(Collection<InfoType> types) {
+        for (InfoType type : types) {
+            put(type);
+        }
+    }
+
+    public synchronized InfoType get(String id) {
+        return allTypes.get(id);
+    }
+
+    public synchronized InfoType getType(String id) throws Exception {
+        InfoType type = allTypes.get(id);
+        if (type == null) {
+            throw new Exception("Could not find type: " + id + HttpStatus.NOT_FOUND.toString());
+        }
+        return type;
+    }
+
+    public static class ConfigFile {
+        Collection<InfoType> types;
+    }
+
+    public synchronized void put(InfoType type) {
+        logger.debug("Put type: {}", type.getId());
+        allTypes.put(type.getId(), type);
+    }
+
+    public synchronized Iterable<InfoType> getAll() {
+        return new Vector<>(allTypes.values());
+    }
+
+    public synchronized Collection<String> typeIds() {
+        return allTypes.keySet();
+    }
+
+    public synchronized int size() {
+        return allTypes.size();
+    }
+
+    public synchronized void clear() {
+        allTypes.clear();
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java
new file mode 100644 (file)
index 0000000..ba12345
--- /dev/null
@@ -0,0 +1,71 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+@ToString
+public class Job {
+    @Builder
+    public static class Parameters {
+
+        @Builder
+        @EqualsAndHashCode
+        public static class KafkaDeliveryInfo {
+            @Getter
+            private String topic;
+
+            @Getter
+            private String bootStrapServers;
+
+            @JsonProperty(value = "numberOfMessages")
+            @Getter
+            private int numberOfMessages;
+        }
+
+        @Getter
+        private KafkaDeliveryInfo deliveryInfo;
+    }
+
+    @Getter
+    private final String id;
+
+    @Getter
+    private final InfoType type;
+
+    @Getter
+    private final String owner;
+
+    @Getter
+    private final Parameters parameters;
+
+    public Job(String id, InfoType type, String owner, Parameters parameters) {
+        this.id = id;
+        this.type = type;
+        this.owner = owner;
+        this.parameters = parameters;
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java
new file mode 100644 (file)
index 0000000..13f3180
--- /dev/null
@@ -0,0 +1,83 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.consumer.repository.Job.Parameters;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@Component
+public class Jobs {
+    private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
+
+    private Map<String, Job> allJobs = new HashMap<>();
+
+    public Jobs() {
+    }
+
+    public synchronized Job getJob(String id) throws Exception {
+        Job job = allJobs.get(id);
+        if (job == null) {
+            throw new Exception("Could not find job: " + id);
+        }
+        return job;
+    }
+
+    public synchronized Job get(String id) {
+        return allJobs.get(id);
+    }
+
+    public void addJob(String id, InfoType type, String owner, Parameters parameters) {
+        Job job = new Job(id, type, owner, parameters);
+        this.put(job);
+    }
+
+    private synchronized void put(Job job) {
+        logger.debug("Put job: {}", job.getId());
+        allJobs.put(job.getId(), job);
+    }
+
+    public synchronized Iterable<Job> getAll() {
+        return new Vector<>(allJobs.values());
+    }
+
+    public synchronized int size() {
+        return allJobs.size();
+    }
+
+    public synchronized Job delete(String id) {
+        return allJobs.remove(id);
+    }
+
+    @Override
+    public String toString() {
+        Gson gson = new GsonBuilder().create();
+        return gson.toJson(allJobs);
+    }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml
new file mode 100644 (file)
index 0000000..eed8326
--- /dev/null
@@ -0,0 +1,31 @@
+#  ========================LICENSE_START=================================
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+server:
+  port : 8081
+
+vars:
+  time: 1000
+  autostart: true
+  topic: mytopic #This topic is used only in autostart
+
+spring:
+  application:
+    name: demoConsumer
+
+logging:
+  level:
+    root: INFO
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties b/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties
new file mode 100644 (file)
index 0000000..48763d6
--- /dev/null
@@ -0,0 +1,42 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+# The location of the Kafka server
+bootstrap.servers=${KAFKA_SERVERS:localhost:9092}
+
+# the default group ID
+group.id=test-group
+
+# the default topic to use if one is not provided
+default.topic=magic-topic
+
+# The number of records to pull of the stream every time
+# the client takes a trip out to Kafka
+max.poll.records=10
+
+# Make Kafka keep track of record reads by the consumer
+enable.auto.commit=true
+
+# The time in milliseconds to Kafka write the offset of the last message read
+auto.commit.interval.ms=500
+
+# classes for serializing and deserializing messages
+key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+enable.idempotence=false
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml b/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml
new file mode 100644 (file)
index 0000000..dd90b77
--- /dev/null
@@ -0,0 +1,45 @@
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+
+<configuration>
+    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>logging.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>logging.%i.log.zip</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>10MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+        </encoder>
+    </appender>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+        </encoder>
+    </appender>
+    <root level="INFO">
+        <appender-ref ref="FILE"/>
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
diff --git a/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java b/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java
new file mode 100644 (file)
index 0000000..7a7c279
--- /dev/null
@@ -0,0 +1,86 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+import com.demo.consumer.messages.KafkaMessageHandler;
+import com.demo.consumer.messages.PropertiesHelper;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.time.Duration;
+
+import java.util.Properties;
+
+import static org.mockito.Mockito.*;
+
+class SimpleConsumerTest {
+
+    private static final long TIME_OUT_MS = 1000;
+
+    @Mock
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    @InjectMocks
+    private SimpleConsumer simpleConsumer;
+
+    private AutoCloseable closable;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        closable = MockitoAnnotations.openMocks(this);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        closable.close();
+    }
+
+    @Test
+    void testRun() throws Exception {
+        // Mocking the properties object returned by PropertiesHelper.getProperties()
+        Properties properties = new Properties();
+        properties.setProperty("bootstrap.servers", "localhost:9092");
+        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+        // Mocking PropertiesHelper.getProperties() to return the mocked Properties object
+        try (MockedStatic<PropertiesHelper> propertiesHelperMockedStatic = Mockito.mockStatic(PropertiesHelper.class)) {
+            propertiesHelperMockedStatic.when(PropertiesHelper::getProperties).thenReturn(properties);
+
+            String topicName = "testTopic";
+            int numberOfMessages = 10;
+            KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
+
+            simpleConsumer.run(topicName, numberOfMessages, callback);
+            verify(kafkaConsumer, times(0)).poll(Duration.ofMillis(TIME_OUT_MS));
+        }
+    }
+
+}
diff --git a/sample-services/ics-producer-consumer/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose.yaml
new file mode 100644 (file)
index 0000000..d9239bc
--- /dev/null
@@ -0,0 +1,83 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+\r
+version: '2'\r
+\r
+networks:\r
+  my-network:\r
+    name: kafka\r
+    driver: bridge\r
+\r
+services:\r
+  kafka-zkless:\r
+    container_name: kafka-zkless\r
+    image: quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64\r
+    command:\r
+      [\r
+        "sh",\r
+        "-c",\r
+        "export CLUSTER_ID=$$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $$CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS}",\r
+      ]\r
+    ports:\r
+      - "9092:9092"\r
+    environment:\r
+      LOG_DIR: "/tmp/logs"\r
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT\r
+      KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093\r
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092\r
+    tty: true\r
+    stdin_open: true\r
+    networks:\r
+      - my-network\r
+\r
+  informationcoordinator:\r
+    image: nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0\r
+    container_name: informationcoordinatorservice\r
+    ports:\r
+      - "8083:8083"\r
+    volumes:\r
+      - ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml\r
+    networks:\r
+      - my-network\r
+\r
+  kafka-producer:\r
+    image: o-ran-sc/nonrtric-sample-icsproducer:latest\r
+    container_name: kafka-producer\r
+    environment:\r
+      - KAFKA_SERVERS=kafka-zkless:9092\r
+    ports:\r
+      - "8080:8080"\r
+    networks:\r
+      - my-network\r
+\r
+  kafka-consumer:\r
+    image: o-ran-sc/nonrtric-sample-icsconsumer:latest\r
+    container_name: kafka-consumer\r
+    environment:\r
+      - KAFKA_SERVERS=kafka-zkless:9092\r
+    ports:\r
+      - "8081:8081"\r
+    networks:\r
+      - my-network\r
+\r
+  curl-client:\r
+    image: curlimages/curl:latest\r
+    container_name: curl-client\r
+    command: ["tail", "-f", "/dev/null"]\r
+    networks:\r
+      - my-network\r
diff --git a/sample-services/ics-producer-consumer/docker-compose/.env b/sample-services/ics-producer-consumer/docker-compose/.env
new file mode 100644 (file)
index 0000000..257ac73
--- /dev/null
@@ -0,0 +1,24 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+#CONTROL_PANEL
+CONTROL_PANEL_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-controlpanel"
+CONTROL_PANEL_IMAGE_TAG="2.5.0"
+
+#NONRTRIC_GATEWAY
+NONRTRIC_GATEWAY_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-gateway"
+NONRTRIC_GATEWAY_IMAGE_TAG="1.2.0"
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt b/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt
new file mode 100644 (file)
index 0000000..2496663
--- /dev/null
@@ -0,0 +1,14 @@
+LICENSE.txt
+
+Unless otherwise specified, all software contained herein is licensed
+under the Apache License, Version 2.0 (the "Software License");
+you may not use this software except in compliance with the Software
+License. You may obtain a copy of the Software License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the Software License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the Software License for the specific language governing permissions
+and limitations under the Software License.
diff --git a/sample-services/ics-producer-consumer/docker-compose/README.md b/sample-services/ics-producer-consumer/docker-compose/README.md
new file mode 100644 (file)
index 0000000..d7542a8
--- /dev/null
@@ -0,0 +1,35 @@
+# License
+
+Copyright (C) 2020 Nordix Foundation.
+Licensed under the Apache License, Version 2.0 (the "License")
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+For more information about license please see the [LICENSE](LICENSE.txt) file for details.
+
+## O-RAN-SC docker-compose files
+
+The docker compose file helps the user to deploy the components of nonrtric control panel with one command.
+
+NOTE:
+docker image urls & tags are in file ```.env```
+
+To install the Control Panel and gateway, run the following command:
+
+```shell
+docker-compose --env-file .env -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml up -d
+```
+
+To remove the containers, use the command:
+
+```shell
+docker-compose --env-file .env -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml down
+```
diff --git a/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf b/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf
new file mode 100644 (file)
index 0000000..66e8ac4
--- /dev/null
@@ -0,0 +1,45 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+
+events{}
+
+http {
+    include /etc/nginx/mime.types;
+    resolver 127.0.0.11;
+    server {
+        listen 8080;
+        server_name localhost;
+        root /usr/share/nginx/html;
+        index index.html;
+        location /a1-policy/ {
+            set $upstream nonrtric-gateway;
+            proxy_pass http://$upstream:9090;
+        }
+        location /data-producer/{
+            set $upstream nonrtric-gateway;
+            proxy_pass http://$upstream:9090;
+        }
+        location /data-consumer/{
+            set $upstream nonrtric-gateway;
+            proxy_pass http://$upstream:9090;
+        }
+        location / {
+            try_files $uri $uri/ /index.html;
+        }
+    }
+}
diff --git a/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml
new file mode 100644 (file)
index 0000000..a42b7a8
--- /dev/null
@@ -0,0 +1,32 @@
+#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+version: '3.5'
+
+networks:
+  kafka:
+    external: true
+
+services:
+  policy-control-panel:
+    image: "${CONTROL_PANEL_IMAGE_BASE}:${CONTROL_PANEL_IMAGE_TAG}"
+    container_name: policy-control-panel
+    networks:
+      - kafka
+    ports:
+      - 8181:8080
+      - 8282:8082
+    volumes:
+      - ./control-panel/config/nginx.conf:/etc/nginx/nginx.conf:ro
diff --git a/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml
new file mode 100644 (file)
index 0000000..c30aa2c
--- /dev/null
@@ -0,0 +1,20 @@
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+version: '3.5'
+
+networks:
+  kafka:
+    external: true
diff --git a/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml
new file mode 100644 (file)
index 0000000..c620b03
--- /dev/null
@@ -0,0 +1,48 @@
+################################################################################
+#   Copyright (c) 2021 Nordix Foundation.                                      #
+#                                                                              #
+#   Licensed under the Apache License, Version 2.0 (the "License");            #
+#   you may not use this file except in compliance with the License.           #
+#   You may obtain a copy of the License at                                    #
+#                                                                              #
+#       http://www.apache.org/licenses/LICENSE-2.0                             #
+#                                                                              #
+#   Unless required by applicable law or agreed to in writing, software        #
+#   distributed under the License is distributed on an "AS IS" BASIS,          #
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+#   See the License for the specific language governing permissions and        #
+#   limitations under the License.                                             #
+################################################################################
+
+server:
+  port: 9090
+spring:
+  cloud:
+    gateway:
+      httpclient:
+        ssl:
+          useInsecureTrustManager: true
+        wiretap: true
+      httpserver:
+        wiretap: true
+      routes:
+      - id: A1-EI
+        uri: http://informationcoordinatorservice:8083
+        predicates:
+        - Path=/data-producer/**,/data-consumer/**
+management:
+  endpoint:
+    gateway:
+      enabled: true
+  endpoints:
+    web:
+      exposure:
+        include: "gateway,loggers,logfile,health,info,metrics,threaddump,heapdump"
+logging:
+  level:
+    ROOT: ERROR
+    org.springframework: ERROR
+    org.springframework.cloud.gateway: INFO
+    reactor.netty: INFO
+  file:
+    name: /var/log/nonrtric-gateway/application.log
diff --git a/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml
new file mode 100644 (file)
index 0000000..138229d
--- /dev/null
@@ -0,0 +1,33 @@
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+version: '3.5'
+
+networks:
+  kafka:
+    external: true
+
+services:
+  nonrtric-gateway:
+    image: "${NONRTRIC_GATEWAY_IMAGE_BASE}:${NONRTRIC_GATEWAY_IMAGE_TAG}"
+    container_name: nonrtric-gateway
+    networks:
+      kafka:
+        aliases:
+          - nonrtric-gateway-container
+    ports:
+      - 9090:9090
+    volumes:
+      - ./nonrtric-gateway/config/application-nonrtricgateway.yaml:/opt/app/nonrtric-gateway/config/application.yaml:ro
diff --git a/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml b/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml
new file mode 100644 (file)
index 0000000..0184de4
--- /dev/null
@@ -0,0 +1,38 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+version: '2'
+networks:
+  kafka:
+    external: true
+
+services:
+  redpanda-console:
+    container_name: redpanda-console
+    image: docker.redpanda.com/redpandadata/console:v2.4.5
+    entrypoint: /bin/sh
+    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
+    environment:
+      CONFIG_FILEPATH: /tmp/config.yml
+      CONSOLE_CONFIG_FILE: |
+        kafka:
+          brokers: ["kafka-zkless:9092"]
+    ports:
+      - 8888:8080
+    networks:
+      - kafka
diff --git a/sample-services/ics-producer-consumer/producer/Dockerfile b/sample-services/ics-producer-consumer/producer/Dockerfile
new file mode 100644 (file)
index 0000000..148872a
--- /dev/null
@@ -0,0 +1,49 @@
+#==================================================================================
+#   Copyright (C) 2024: OpenInfra Foundation Europe
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+#   platform project (RICP).
+#==================================================================================
+
+# Use Maven image with OpenJDK 17 for the build stage
+FROM maven:3.8.5-openjdk-17 AS maven_build
+
+# Copy Maven project files
+COPY pom.xml /tmp/
+COPY src /tmp/src/
+
+# Set working directory
+WORKDIR /tmp/
+
+# Build the Maven project
+RUN mvn package
+
+# Use a separate image with OpenJDK 17 for the runtime stage
+FROM openjdk:17-jdk-slim
+
+# Expose port 8080
+EXPOSE 8080
+
+ARG KAFKA_SERVERS
+ENV KAFKA_SERVERS=${KAFKA_SERVERS}
+
+# Set the working directory
+WORKDIR /app
+
+# Copy the JAR file from the maven_build stage to the runtime stage
+COPY --from=maven_build /tmp/target/producer-0.0.1.jar /app/producer-0.0.1.jar
+
+# Command to run the application
+CMD ["java", "-jar", "producer-0.0.1.jar"]
diff --git a/sample-services/ics-producer-consumer/producer/Makefile b/sample-services/ics-producer-consumer/producer/Makefile
new file mode 100644 (file)
index 0000000..301526a
--- /dev/null
@@ -0,0 +1,51 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+# Define variables
+IMAGE_NAME = "o-ran-sc/nonrtric-sample-icsproducer"
+DOCKERFILE = Dockerfile
+
+# Default target
+.PHONY: all
+all: build run
+
+# Target to build the Maven JAR
+.PHONY: jar
+jar:
+       mvn clean package
+
+# Target to build the Docker image
+.PHONY: build
+build:
+       docker build -t $(IMAGE_NAME) -f $(DOCKERFILE) .
+
+# Target to run the Docker container
+.PHONY: run
+run:
+       docker run -p 8080:8080 $(IMAGE_NAME)
+
+# Target to stop and remove the Docker container
+.PHONY: stop
+stop:
+       docker stop $(IMAGE_NAME) || true
+       docker rm $(IMAGE_NAME) || true
+
+# Target to clean up
+.PHONY: clean
+clean: stop
+       docker rmi $(IMAGE_NAME) || true
diff --git a/sample-services/ics-producer-consumer/producer/container-tag.yaml b/sample-services/ics-producer-consumer/producer/container-tag.yaml
new file mode 100644 (file)
index 0000000..48290af
--- /dev/null
@@ -0,0 +1,23 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+\r
+# The Jenkins job requires a tag to build the Docker image.\r
+# By default this file is in the docker build directory,\r
+# but the location can configured in the JJB template.\r
+---\r
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/producer/pom.xml b/sample-services/ics-producer-consumer/producer/pom.xml
new file mode 100644 (file)
index 0000000..d78f370
--- /dev/null
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>3.2.3</version>
+        <relativePath/>
+    </parent>
+
+    <groupId>com.demo.producer</groupId>
+    <artifactId>producer</artifactId>
+    <version>0.0.1</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.7.0</version>
+            <!--<scope>provided</scope> -->
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+           <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.24</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.10.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <mainClass>com.demo.producer.Application</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java
new file mode 100644 (file)
index 0000000..fc389f4
--- /dev/null
@@ -0,0 +1,56 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+import com.demo.producer.messages.KafkaMessageHandlerImpl;
+import com.demo.producer.producer.SimpleProducer;
+
+@SpringBootApplication
+@EnableAsync
+public class Application {
+
+    @Autowired
+    private SimpleProducer simpleProducer;
+
+    @Value("${vars.autostart:false}")
+    private boolean autostart;
+
+    @Value("${vars.topic}")
+    private String topic;
+    public static void main(String[] args) throws Exception {
+        SpringApplication.run(Application.class, args);
+    }
+
+    @EventListener(ApplicationReadyEvent.class)
+    public void checkAutoRun() throws Exception{
+        if (autostart) {
+            simpleProducer.runAlways(topic, new KafkaMessageHandlerImpl());
+        }
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java
new file mode 100644 (file)
index 0000000..5bc6821
--- /dev/null
@@ -0,0 +1,137 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.controllers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.demo.producer.repository.InfoType;
+import com.demo.producer.repository.InfoTypes;
+import com.demo.producer.repository.Job.Parameters;
+import com.demo.producer.repository.Jobs;
+import com.demo.producer.dme.ProducerJobInfo;
+import com.demo.producer.messages.KafkaMessageHandlerImpl;
+import com.demo.producer.producer.SimpleProducer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@RestController
+@RequestMapping(path = "/producer", produces = "application/json")
+public class ProducerController {
+    private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
+
+    private static Gson gson = new GsonBuilder().create();
+
+    private final Jobs jobs;
+    private final InfoTypes types;
+    private String topicName = "mytopic";
+
+
+    public ProducerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
+        this.jobs = jobs;
+        this.types = types;
+        InfoType type1 = InfoType.builder().build();
+        type1.setId("type1");
+        type1.setKafkaInputTopic(topicName);
+        type1.setInputJobType("type1");
+        type1.setInputJobDefinition(null);
+        types.put(type1);
+    }
+
+    @GetMapping("/publish/{numberOfMessages}")
+    public ResponseEntity<?> publishMessage(@PathVariable int numberOfMessages) {
+        try {
+            new SimpleProducer().run(topicName, numberOfMessages, new KafkaMessageHandlerImpl());
+            return ResponseEntity.ok("message published successfully ..");
+        } catch (Exception ex) {
+            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
+                    .build();
+        }
+    }
+
+    @PostMapping("/job/{infoJobId}")
+    public void jobCallback(@RequestBody String requestBody, @PathVariable String infoJobId) {
+        ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class);
+        try {
+            log.info("Adding producer job info " + request.toString());
+            this.jobs.addJob(request.id, types.getType(request.typeId), request.owner,
+                    toJobParameters(request.jobData));
+        } catch (Exception e) {
+            log.error("Error adding producer job info: " + request.toString(), e.getMessage());
+        }
+    }
+
+    @PostMapping("/job")
+    public void jobCallbackNoId(@RequestBody String requestBody) {
+        ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class);
+        try {
+            log.info("Adding producer job info "+request.toString());
+            this.jobs.addJob(request.id, types.getType(request.typeId), request.owner,
+                    toJobParameters(request.jobData));
+        } catch (Exception e) {
+            log.error("Error adding producer job info: " + request.toString(), e.getMessage());
+        }
+    }
+
+    private Parameters toJobParameters(Object jobData) {
+        String json = gson.toJson(jobData);
+        return gson.fromJson(json, Parameters.class);
+    }
+
+    @GetMapping("/job")
+    public ResponseEntity<String> getJobs() {
+        try {
+            log.info("Get all jobs");
+            return new ResponseEntity<>(this.jobs.getAll().toString(), HttpStatus.OK);
+        } catch (Exception e) {
+            log.error("Error finding jobs", e.getMessage());
+            return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @DeleteMapping("/job/{infoJobId}")
+    public ResponseEntity<String> deleteJob(@PathVariable String infoJobId) {
+        try {
+            log.info("Delete Job" + infoJobId);
+            this.jobs.delete(infoJobId);
+            return new ResponseEntity<>("Deleted job:" + infoJobId, HttpStatus.OK);
+        } catch (Exception e) {
+            log.error("Error finding job " + infoJobId, e.getMessage());
+            return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GetMapping("/supervision")
+    public ResponseEntity<String> getSupervision() {
+        log.info("Get Supervision");
+        return new ResponseEntity<>("Ok", HttpStatus.OK);
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java
new file mode 100644 (file)
index 0000000..e503b84
--- /dev/null
@@ -0,0 +1,93 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.controllers;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+import com.demo.producer.messages.KafkaMessageHandlerImpl;
+import com.demo.producer.messages.ApplicationMessageHandlerImpl;
+import com.demo.producer.producer.SimpleProducer;
+
+@RestController
+public class ThreadsController {
+    private static final Logger log = LoggerFactory.getLogger(ThreadsController.class);
+
+    @Autowired
+    private SimpleProducer simpleProducer;
+
+    private Thread producerThread;
+
+    @Async
+    @GetMapping("/startProducer/{topicName}")
+    public CompletableFuture<String> startProducer(@PathVariable("topicName") String topicName) {
+        try {
+            producerThread = new Thread(() -> {
+                try {
+                    simpleProducer.runAlways(topicName, new ApplicationMessageHandlerImpl());
+                } catch (Exception e) {
+                    log.error("Error starting producer on: " + topicName, e.getMessage());
+                }
+            });
+            producerThread.start();
+            return CompletableFuture.completedFuture("Producer started for topic: " + topicName);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    @GetMapping("/stopProducer")
+    public String stopProducer() {
+        if (producerThread != null && producerThread.isAlive()) {
+            try {
+                simpleProducer.shutdown();
+            } catch (Exception e) {
+                log.error("Error stopping producer Thread", e.getMessage());
+            }
+            return "Producer stopped successfully";
+        } else {
+            return "No active producer to stop";
+        }
+    }
+
+    @GetMapping("/publish/{numberOfMessages}/on/{topicName}")
+    public CompletableFuture<String> publishNMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) {
+        try {
+            producerThread = new Thread(() -> {
+                try {
+                    simpleProducer.run(topicName, numberOfMessages, new KafkaMessageHandlerImpl());
+                } catch (Exception e) {
+                    log.error("Error producing " + numberOfMessages + "on " + topicName, e.getMessage());
+                }
+            });
+            producerThread.start();
+            return CompletableFuture.completedFuture("Producer started for topic: " + topicName);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java
new file mode 100644 (file)
index 0000000..5f87f4d
--- /dev/null
@@ -0,0 +1,41 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProducerInfoTypeInfo {
+
+    @SerializedName("info_job_data_schema")
+    @JsonProperty(value = "info_job_data_schema", required = true)
+    public Object jobDataSchema;
+
+    @SerializedName("info_type_information")
+    @JsonProperty(value = "info_type_information", required = true)
+    public Object typeSpecificInformation;
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java
new file mode 100644 (file)
index 0000000..c55d4c5
--- /dev/null
@@ -0,0 +1,53 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProducerJobInfo {
+
+    @SerializedName("info_job_identity")
+    @JsonProperty("info_job_identity")
+    public String id = "";
+
+    @SerializedName("info_type_identity")
+    @JsonProperty("info_type_identity")
+    public String typeId = "";
+
+    @SerializedName("info_job_data")
+    @JsonProperty("info_job_data")
+    public Object jobData;
+
+    @SerializedName("owner")
+    @JsonProperty("owner")
+    public String owner = "";
+
+    @SerializedName("last_updated")
+    @JsonProperty("last_updated")
+    public String lastUpdated = "";
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java
new file mode 100644 (file)
index 0000000..5f279d2
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Collection;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProducerRegistrationInfo {
+
+    @SerializedName("supported_info_types")
+    @JsonProperty(value = "supported_info_types", required = true)
+    public Collection<String> supportedTypeIds;
+
+    @SerializedName("info_job_callback_url")
+    @JsonProperty(value = "info_job_callback_url", required = true)
+    public String jobCallbackUrl;
+
+    @SerializedName("info_producer_supervision_callback_url")
+    @JsonProperty(value = "info_producer_supervision_callback_url", required = true)
+    public String producerSupervisionCallbackUrl;
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java
new file mode 100644 (file)
index 0000000..d00d074
--- /dev/null
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class AbstractSimpleKafka {
+    private static final Logger log = LoggerFactory.getLogger(AbstractSimpleKafka.class);
+
+    public AbstractSimpleKafka(){
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                    shutdown();
+            }
+        });
+        log.info(MessageHelper.getSimpleJSONObject("Created the Shutdown Hook").toJSONString());
+    }
+
+    public abstract void shutdown();
+    public abstract void runAlways(String topicName, KafkaMessageHandler callback) throws Exception;
+    public abstract void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java
new file mode 100644 (file)
index 0000000..beae794
--- /dev/null
@@ -0,0 +1,39 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ApplicationMessageHandlerImpl implements KafkaMessageHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(ApplicationMessageHandlerImpl.class);
+
+    @Override
+    public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+        String source = KafkaMessageHandlerImpl.class.getName();
+        JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+        System.out.println(obj.toJSONString());
+        log.info(obj.toJSONString());
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java
new file mode 100644 (file)
index 0000000..814c88c
--- /dev/null
@@ -0,0 +1,28 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+@FunctionalInterface
+public interface KafkaMessageHandler {
+    void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java
new file mode 100644 (file)
index 0000000..06f7026
--- /dev/null
@@ -0,0 +1,37 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.json.simple.JSONObject;
+
+public class KafkaMessageHandlerImpl implements KafkaMessageHandler {
+    private static final Logger log = LoggerFactory.getLogger(KafkaMessageHandlerImpl.class);
+
+    @Override
+    public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+        String source = KafkaMessageHandlerImpl.class.getName();
+        JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+        log.info(obj.toJSONString());
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java
new file mode 100644 (file)
index 0000000..7a5e45b
--- /dev/null
@@ -0,0 +1,71 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import java.util.Properties;
+import java.util.Random;
+import org.json.simple.JSONObject;
+
+public class MessageHelper {
+
+    private static Properties props;
+
+    public static String getRandomString() {
+        int leftLimit = 48; // numeral '0'
+        int rightLimit = 122; // letter 'z'
+        int targetStringLength = 10;
+        Random random = new Random();
+
+        return random.ints(leftLimit, rightLimit + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
+                .limit(targetStringLength)
+                .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
+    }
+
+    @SuppressWarnings("unchecked") // Using only strings
+    public static JSONObject getMessageLogEntryJSON(String source, String topic, String key, String message)
+            throws Exception {
+        JSONObject obj = new JSONObject();
+        String bootstrapServers = getProperties().getProperty("bootstrap.servers");
+        obj.put("bootstrapServers", bootstrapServers);
+        obj.put("source", source);
+        obj.put("topic", topic);
+        obj.put("key", key);
+        obj.put("message", message);
+
+        return obj;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    public static JSONObject getSimpleJSONObject(String message){
+        JSONObject obj = new JSONObject();
+        obj.put("message", message);
+        return obj;
+    }
+
+    protected static Properties getProperties() throws Exception {
+        if (props == null) {
+            props = PropertiesHelper.getProperties();
+        }
+        return props;
+    }
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java
new file mode 100644 (file)
index 0000000..7dc2b1e
--- /dev/null
@@ -0,0 +1,58 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.producer.producer.SimpleProducer;
+
+@Component
+public class PropertiesHelper {
+    private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+
+    public static Properties getProperties() throws Exception {
+        Properties props = null;
+        try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
+            props = new Properties();
+            if (input == null) {
+                log.error("Found no configuration file in resources");
+                throw new Exception("Sorry, unable to find config.properties");
+            }
+            props.load(input);
+            String kafkaServers = System.getenv("KAFKA_SERVERS");
+            if (kafkaServers != null) {
+                props.setProperty("bootstrap.servers", kafkaServers);
+                log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+            } else {
+                log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+            }
+        } catch (IOException e) {
+            log.error("Error reading configuration file: ", e.getMessage());
+        }
+        return props;
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java
new file mode 100644 (file)
index 0000000..6c9858b
--- /dev/null
@@ -0,0 +1,112 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.producer.producer;
+
+import java.util.UUID;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.json.simple.JSONObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import com.demo.producer.messages.AbstractSimpleKafka;
+import com.demo.producer.messages.KafkaMessageHandler;
+import com.demo.producer.messages.MessageHelper;
+import com.demo.producer.messages.PropertiesHelper;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Component
+@Getter
+@Setter
+public class SimpleProducer extends AbstractSimpleKafka {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);
+
+    @Value("${vars.time:1000}")
+    private int TIME;
+
+    private KafkaProducer<String, String> kafkaProducer;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception {
+        for (int i = 0; i < numberOfMessages; i++) {
+            String key = UUID.randomUUID().toString();
+            String message = MessageHelper.getRandomString();
+            if (this.getTopicName() == null) {
+                this.setTopicName(topicName);
+            }
+            this.send(topicName, key, message);
+            Thread.sleep(TIME);
+        }
+        this.shutdown();
+    }
+
+    public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
+        while (true) {
+            String key = UUID.randomUUID().toString();
+            String message = MessageHelper.getRandomString();
+            this.send(topicName, key, message);
+            Thread.sleep(TIME);
+        }
+    }
+
+    private String topicName = null;
+
+    private void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    private String getTopicName() {
+        return this.topicName;
+    }
+
+    protected void send(String topicName, String key, String message) throws Exception {
+        String source = SimpleProducer.class.getName();
+        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, message);
+        JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, key, message);
+        log.info(obj.toJSONString());
+        getKafkaProducer().send(producerRecord);
+    }
+
+    private KafkaProducer<String, String> getKafkaProducer() throws Exception {
+        if (this.kafkaProducer == null) {
+            Properties props = PropertiesHelper.getProperties();
+            this.kafkaProducer = new KafkaProducer<>(props);
+        }
+        return this.kafkaProducer;
+    }
+
+    public void shutdown(){
+        closed.set(true);
+        try {
+            log.info(MessageHelper.getSimpleJSONObject("Shutting down producer").toJSONString());
+            getKafkaProducer().close();
+        } catch (Exception e) {
+            log.error("Error shutting down the Producer ", e.getMessage());
+        }
+
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java
new file mode 100644 (file)
index 0000000..9f9c0cf
--- /dev/null
@@ -0,0 +1,49 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@ToString
+@Builder(access = AccessLevel.PUBLIC)
+public class InfoType {
+
+    @Getter
+    @Setter
+    private String id;
+
+    @Getter
+    @Setter
+    private String kafkaInputTopic;
+
+    @Getter
+    @Setter
+    private String inputJobType;
+
+    @Getter
+    @Setter
+    private Object inputJobDefinition;
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java
new file mode 100644 (file)
index 0000000..b7a9914
--- /dev/null
@@ -0,0 +1,81 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+
+@Component
+public class InfoTypes {
+    private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class);
+
+    private Map<String, InfoType> allTypes = new HashMap<>();
+
+    public InfoTypes(Collection<InfoType> types) {
+        for (InfoType type : types) {
+            put(type);
+        }
+    }
+
+    public synchronized InfoType get(String id) {
+        return allTypes.get(id);
+    }
+
+    public synchronized InfoType getType(String id) throws Exception {
+        InfoType type = allTypes.get(id);
+        if (type == null) {
+            throw new Exception("Could not find type: " + id + HttpStatus.NOT_FOUND.toString());
+        }
+        return type;
+    }
+
+    public static class ConfigFile {
+        Collection<InfoType> types;
+    }
+
+    public synchronized void put(InfoType type) {
+        logger.debug("Put type: {}", type.getId());
+        allTypes.put(type.getId(), type);
+    }
+
+    public synchronized Iterable<InfoType> getAll() {
+        return new Vector<>(allTypes.values());
+    }
+
+    public synchronized Collection<String> typeIds() {
+        return allTypes.keySet();
+    }
+
+    public synchronized int size() {
+        return allTypes.size();
+    }
+
+    public synchronized void clear() {
+        allTypes.clear();
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java
new file mode 100644 (file)
index 0000000..cafddde
--- /dev/null
@@ -0,0 +1,71 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+@ToString
+public class Job {
+    @Builder
+    public static class Parameters {
+
+        @Builder
+        @EqualsAndHashCode
+        public static class KafkaDeliveryInfo {
+            @Getter
+            private String topic;
+
+            @Getter
+            private String bootStrapServers;
+
+            @JsonProperty(value = "numberOfMessages")
+            @Getter
+            private int numberOfMessages;
+        }
+
+        @Getter
+        private KafkaDeliveryInfo deliveryInfo;
+    }
+
+    @Getter
+    private final String id;
+
+    @Getter
+    private final InfoType type;
+
+    @Getter
+    private final String owner;
+
+    @Getter
+    private final Parameters parameters;
+
+    public Job(String id, InfoType type, String owner, Parameters parameters) {
+        this.id = id;
+        this.type = type;
+        this.owner = owner;
+        this.parameters = parameters;
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java
new file mode 100644 (file)
index 0000000..f9cffd8
--- /dev/null
@@ -0,0 +1,83 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.producer.repository.Job.Parameters;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@Component
+public class Jobs {
+    private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
+
+    private Map<String, Job> allJobs = new HashMap<>();
+
+    public Jobs() {
+    }
+
+    public synchronized Job getJob(String id) throws Exception {
+        Job job = allJobs.get(id);
+        if (job == null) {
+            throw new Exception("Could not find job: " + id);
+        }
+        return job;
+    }
+
+    public synchronized Job get(String id) {
+        return allJobs.get(id);
+    }
+
+    public void addJob(String id, InfoType type, String owner, Parameters parameters) {
+        Job job = new Job(id, type, owner, parameters);
+        this.put(job);
+    }
+
+    private synchronized void put(Job job) {
+        logger.debug("Put job: {}", job.getId());
+        allJobs.put(job.getId(), job);
+    }
+
+    public synchronized Iterable<Job> getAll() {
+        return new Vector<>(allJobs.values());
+    }
+
+    public synchronized int size() {
+        return allJobs.size();
+    }
+
+    public synchronized Job delete(String id) {
+        return allJobs.remove(id);
+    }
+
+    @Override
+    public String toString() {
+        Gson gson = new GsonBuilder().create();
+        return gson.toJson(allJobs);
+    }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml
new file mode 100644 (file)
index 0000000..31966df
--- /dev/null
@@ -0,0 +1,28 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+vars:
+  time: 1000
+  autostart: true
+  topic: mytopic #This topic is used only in autostart
+
+spring:
+  application:
+    name: demoProducer
+
+logging:
+  level:
+    root: INFO
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties b/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties
new file mode 100644 (file)
index 0000000..83e5539
--- /dev/null
@@ -0,0 +1,44 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+# The location of the Kafka server
+bootstrap.servers=${KAFKA_SERVERS:localhost:9092}
+
+# the default group ID
+group.id=test-group
+
+# the default topic to use if one is not provided
+default.topic=magic-topic
+
+# The number of records to pull of the stream every time
+# the client takes a trip out to Kafka
+max.poll.records=10
+
+# Make Kafka keep track of record reads by the consumer
+enable.auto.commit=true
+
+# The time in milliseconds to Kafka write the offset of the last message read
+auto.commit.interval.ms=500
+
+# classes for serializing and deserializing messages
+key.serializer=org.apache.kafka.common.serialization.StringSerializer
+value.serializer=org.apache.kafka.common.serialization.StringSerializer
+key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+enable.idempotence=false
diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml b/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml
new file mode 100644 (file)
index 0000000..3dbfe39
--- /dev/null
@@ -0,0 +1,44 @@
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+<configuration>
+    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>logging.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>logging.%i.log.zip</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>10MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+        </encoder>
+    </appender>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+        </encoder>
+    </appender>
+    <root level="INFO">
+        <appender-ref ref="FILE"/>
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
diff --git a/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java
new file mode 100644 (file)
index 0000000..da77873
--- /dev/null
@@ -0,0 +1,105 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer;
+
+import com.demo.producer.producer.SimpleProducer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+import com.demo.producer.messages.KafkaMessageHandler;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+class SimpleProducerTest {
+
+    private final int wait = 1000;
+    private final String topicName = "testTopic";
+
+    @Mock
+    private KafkaProducer<String, String> kafkaProducer;
+
+    @InjectMocks
+    @Autowired
+    private SimpleProducer simpleProducer;
+
+    private AutoCloseable closable;
+
+    @BeforeEach
+    void setUp() {
+        closable = MockitoAnnotations.openMocks(this);
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        closable.close();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked") //sending only Strings
+    void testRun() throws Exception {
+        int numberOfMessages = 10;
+        KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
+
+        simpleProducer.run(topicName, numberOfMessages, callback);
+
+        verify(kafkaProducer, times(numberOfMessages)).send(any(ProducerRecord.class));
+        verify(kafkaProducer, times(1)).close();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked") //sending only Strings
+    void testRunAlways() throws Exception {
+        KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
+        simpleProducer.setTIME(wait);
+        // Mocking behavior to break out of the loop after a few iterations
+        doAnswer(invocation -> {
+            simpleProducer.shutdown();
+            return null;
+        }).when(kafkaProducer).send(any(ProducerRecord.class));
+
+        // Invoking runAlways() in a separate thread to avoid an infinite loop
+        Thread thread = new Thread(() -> {
+            try {
+                simpleProducer.runAlways(topicName, callback);
+            } catch (Exception e) {
+            }
+        });
+        thread.start();
+
+        // Let the thread execute for some time (e.g., 1 second)
+        Thread.sleep(wait);
+
+        // Interrupting the thread to stop the infinite loop
+        thread.interrupt();
+
+        verify(kafkaProducer, atLeastOnce()).send(any(ProducerRecord.class));
+        verify(kafkaProducer, times(1)).close();
+    }
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java
new file mode 100644 (file)
index 0000000..d3587f1
--- /dev/null
@@ -0,0 +1,89 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer;
+
+import com.demo.producer.controllers.ThreadsController;
+import com.demo.producer.producer.SimpleProducer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.mock.web.MockHttpServletRequest;
+import org.springframework.mock.web.MockHttpServletResponse;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.web.context.request.RequestContextHolder;
+import org.springframework.web.context.request.ServletRequestAttributes;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.*;
+
+@SpringBootTest
+@ContextConfiguration(classes = SimpleProducer.class)
+public class ThreadsControllerTest {
+
+    @Mock
+    @Autowired
+    private SimpleProducer simpleProducer;
+
+    @InjectMocks
+    private ThreadsController threadsController;
+
+    private MockHttpServletRequest request;
+    private MockHttpServletResponse response;
+
+    private AutoCloseable closable;
+
+    @BeforeEach
+    public void setUp() {
+        closable = MockitoAnnotations.openMocks(this);
+        request = new MockHttpServletRequest();
+        response = new MockHttpServletResponse();
+        RequestContextHolder.setRequestAttributes(new ServletRequestAttributes(request, response));
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        closable.close();
+    }
+
+    @Test
+    public void testStopProducerWhenNoActiveProducer() throws Exception {
+        String result = threadsController.stopProducer();
+        assertEquals("No active producer to stop", result);
+        verify(simpleProducer, never()).shutdown();
+    }
+
+    @Test
+    public void testPublishNMessage() throws Exception {
+        String topicName = "testTopic";
+        int numberOfMessages = 10;
+        CompletableFuture<String> future = CompletableFuture.completedFuture("Producer started for topic: " + topicName);
+        CompletableFuture<String> result = threadsController.publishNMessage(numberOfMessages, topicName);
+        assertEquals(future.getClass(), result.getClass());
+    }
+
+}
diff --git a/sample-services/ics-producer-consumer/red.sh b/sample-services/ics-producer-consumer/red.sh
new file mode 100644 (file)
index 0000000..12085fe
--- /dev/null
@@ -0,0 +1,189 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+#!/bin/bash
+skip_build=false
+no_console=false
+
+# Parse command line arguments
+while [[ $# -gt 0 ]]; do
+    case $1 in
+        --skip-build)
+            skip_build=true
+            shift
+            ;;
+        --no-console)
+            no_console=true
+            shift
+            ;;
+        *)
+            echo "Unknown option: $1"
+            exit 1
+            ;;
+    esac
+done
+# Source the utils script
+source utils.sh
+
+# Check Prerequisites
+checkJava
+checkDocker
+checkDockerCompose
+
+if ! $skip_build; then
+    # Make build the demo docker image
+    cd ./producer/
+    make build
+    cd ../consumer/
+    make build
+    cd ..
+fi
+
+# Start the Docker containers in detached mode
+docker-compose up -d
+
+# Wait for the Kafka container to be running
+wait_for_container "kafka-zkless" "Kafka Server started"
+space
+
+if ! $no_console; then
+    echo "Start RedPanda Console"
+    docker-compose -f docker-composeRedPanda.yaml up -d
+    space
+
+    echo "Start NONRTRIC control panel"
+    docker-compose -f ./docker-compose/docker-compose.yaml -f ./docker-compose/control-panel/docker-compose.yaml -f ./docker-compose/nonrtric-gateway/docker-compose.yaml up -d
+    space
+fi
+
+# Once Kafka container is running, start the producers and consumers
+echo "Kafka container is up and running. Starting producer and consumer..."
+space
+
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
+echo "Sending type1 to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-producer/v1/info-types/type1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_job_data_schema": {
+    "$schema":"http://json-schema.org/draft-07/schema#",
+    "title":"STD_Type1_1.0.0",
+    "description":"Type 1",
+    "type":"object"
+  }
+}'
+
+echo "Getting types from ICS"
+curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1'
+space
+
+echo "Sending Producer infos to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-producer/v1/info-producers/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision",
+  "supported_info_types": [
+    "type1"
+  ],
+  "info_job_callback_url": "http://kafka-producer:8080/producer/job"
+}'
+
+echo "Getting Producers Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1'
+space
+
+echo "Sending Consumer Job infos to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-consumer/v1/info-jobs/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_type_id": "type1",
+  "job_owner": "demo",
+  "job_definition": {
+    "deliveryInfo": {
+      "topic": "mytopic",
+      "bootStrapServers": "http://kafka-zkless:9092",
+      "numberOfMessages": 0
+    }
+  },
+  "job_result_uri": "http://kafka-producer:8080/producer/job",
+  "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+echo "Getting Consumer Job Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1'
+space
+
+echo "Sending Consumer Subscription Job infos to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "status_result_uri": "http://kafka-consumer:8081/info-type-status",
+  "owner": "owner"
+}'
+echo "Getting Consumer Subscription Job infos from ICS"
+curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
+space
+
+sleep 5
+echo "ICS Producer Docker logs "
+docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
+space
+echo "Demo Producer Docker logs "
+docker logs kafka-producer | grep c.d.p.p.SimpleProducer
+space
+echo "Demo Consumer Docker logs "
+docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer
+space
+
+if ! $no_console; then
+    echo "Red Panda Console: http://localhost:8888"
+    echo "Control Panel Console: http://localhost:8181"
+fi
+
+echo "Done."
+
+containers=("kafka-producer" "kafka-consumer")
+
+for container in "${containers[@]}"; do
+  if docker logs "$container" | grep -q ERROR; then
+    echo "Errors found in logs of $container"
+    echo "FAIL"
+    exit 1
+  else
+    echo "No errors found in logs of $container"
+  fi
+done
+echo "SUCCESS"
+exit 0
diff --git a/sample-services/ics-producer-consumer/runconsumer.sh b/sample-services/ics-producer-consumer/runconsumer.sh
new file mode 100644 (file)
index 0000000..9ef776a
--- /dev/null
@@ -0,0 +1,2 @@
+#!/bin/bash
+curl -X GET http://localhost:8081/startConsumer/mytopic
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/runproducer.sh b/sample-services/ics-producer-consumer/runproducer.sh
new file mode 100644 (file)
index 0000000..f9f5a7e
--- /dev/null
@@ -0,0 +1,2 @@
+#!/bin/bash
+curl -X GET http://localhost:8080/startProducer/mytopic
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/start.sh b/sample-services/ics-producer-consumer/start.sh
new file mode 100755 (executable)
index 0000000..18317a0
--- /dev/null
@@ -0,0 +1,180 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#!/bin/bash
+source utils.sh
+
+PREFIX="nexus3.o-ran-sc.org:10004"
+VERSION="0.0.1"
+
+# Create a network for Kafka Containers
+docker network create kafka-net
+
+# Start Kafka
+docker run -d \
+  --network kafka-net \
+  --name kafka-zkless \
+  -p 9092:9092 \
+  -e LOG_DIR="/tmp/logs" \
+  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" \
+  -e KAFKA_LISTENERS="PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093" \
+  -e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092" \
+  quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 \
+  /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && \
+  bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && \
+  bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$KAFKA_ADVERTISED_LISTENERS --override listener.security.protocol.map=$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP --override listeners=$KAFKA_LISTENERS'
+
+# Start ICS
+docker run -d \
+  --network kafka-net \
+  --name informationcoordinatorservice \
+  -p 8083:8083 \
+  -v ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml \
+  nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0
+
+# Start Producer
+docker run -d \
+  --network kafka-net \
+  --name kafka-producer \
+  -p 8080:8080 \
+  -e KAFKA_SERVERS=kafka-zkless:9092 \
+  $PREFIX/o-ran-sc/nonrtric-sample-icsproducer:$VERSION
+
+#Start Consumer
+docker run -d \
+  --network kafka-net \
+  --name kafka-consumer \
+  -p 8081:8081 \
+  -e KAFKA_SERVERS=kafka-zkless:9092 \
+  $PREFIX/o-ran-sc/nonrtric-sample-icsconsumer:$VERSION
+
+# Wait for the Kafka container to be running
+wait_for_container "kafka-zkless" "Kafka Server started"
+wait_for_container "kafka-producer" "Started Application"
+wait_for_container "kafka-consumer" "Started Application"
+
+# Once Kafka container is running, start the producers and consumers
+echo "Kafka container is up and running. Starting producer and consumer..."
+space
+
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
+echo "Sending type1 to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-producer/v1/info-types/type1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_job_data_schema": {
+    "$schema":"http://json-schema.org/draft-07/schema#",
+    "title":"STD_Type1_1.0.0",
+    "description":"Type 1",
+    "type":"object"
+  }
+}'
+
+echo "Getting types from ICS"
+curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1'
+space
+
+echo "Sending Producer infos to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-producer/v1/info-producers/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision",
+  "supported_info_types": [
+    "type1"
+  ],
+  "info_job_callback_url": "http://kafka-producer:8080/producer/job"
+}'
+
+echo "Getting Producers Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1'
+space
+
+echo "Sending Consumer Job infos to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-consumer/v1/info-jobs/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "info_type_id": "type1",
+  "job_owner": "demo",
+  "job_definition": {
+    "deliveryInfo": {
+      "topic": "mytopic",
+      "bootStrapServers": "http://kafka-zkless:9092",
+      "numberOfMessages": 0
+    }
+  },
+  "job_result_uri": "http://kafka-producer:8080/producer/job",
+  "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+echo "Getting Consumer Job Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1'
+space
+
+echo "Sending Consumer Subscription Job infos to ICS"
+curl -X 'PUT' \
+  'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \
+  -H 'accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{
+  "status_result_uri": "http://kafka-consumer:8081/info-type-status",
+  "owner": "owner"
+}'
+echo "Getting Consumer Subscription Job infos from ICS"
+curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
+space
+
+sleep 5
+echo "ICS Producer Docker logs "
+docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
+space
+echo "Demo Producer Docker logs "
+docker logs kafka-producer | grep c.d.p.p.SimpleProducer
+space
+echo "Demo Consumer Docker logs "
+docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer
+space
+
+echo "Done."
+
+containers=("kafka-producer" "kafka-consumer")
+
+for container in "${containers[@]}"; do
+  if docker logs "$container" | grep -q ERROR; then
+    echo "Errors found in logs of $container"
+    echo "FAIL"
+    exit 1
+  else
+    echo "No errors found in logs of $container"
+  fi
+done
+echo "SUCCESS"
+exit 0
diff --git a/sample-services/ics-producer-consumer/stop.sh b/sample-services/ics-producer-consumer/stop.sh
new file mode 100644 (file)
index 0000000..60fba31
--- /dev/null
@@ -0,0 +1,27 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+#!/bin/bash
+docker-compose down
+
+docker-compose -f docker-composeRedPanda.yaml down
+
+docker-compose -f ./docker-compose/docker-compose.yaml \
+    -f ./docker-compose/control-panel/docker-compose.yaml \
+    -f ./docker-compose/nonrtric-gateway/docker-compose.yaml \
+    down
diff --git a/sample-services/ics-producer-consumer/utils.sh b/sample-services/ics-producer-consumer/utils.sh
new file mode 100644 (file)
index 0000000..68d19ec
--- /dev/null
@@ -0,0 +1,98 @@
+#  ========================LICENSE_START=================================
+#  O-RAN-SC
+#
+#  Copyright (C) 2024: OpenInfra Foundation Europe
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+
+#!/bin/bash
+
+checkJava() {
+    if ! command -v java >/dev/null 2>&1; then
+        echo "Java is not installed. Please install Java."
+        echo "Suggested fix for ubuntu:"
+        echo "sudo apt install default-jdk"
+        exit 1
+    else
+        echo "Java is installed."
+    fi
+}
+
+checkMaven() {
+    if mvn -v >/dev/null 2>&1; then
+        echo "Maven is installed."
+    else
+        echo "Maven is not installed. Please install Maven."
+        echo "Suggested fix for ubuntu:"
+        echo "sudo apt install maven"
+        exit 1
+    fi
+}
+
+checkDocker() {
+    if ! docker -v > /dev/null 2>&1; then
+        echo "Docker is not installed. Please install Docker."
+        echo "Suggested fix for ubuntu:"
+        echo "sudo apt-get update"
+        echo "sudo apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release"
+        echo "curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg"
+        echo "echo \"deb [arch=\$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \$(lsb_release -cs) stable\" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null"
+        echo "sudo apt-get update"
+        echo "sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin"
+        echo "sudo usermod -aG docker \$USER"
+               echo "newgrp docker"
+        exit 1
+    else
+        echo "Docker is installed."
+    fi
+}
+
+checkDockerCompose() {
+    if ! docker-compose -v > /dev/null 2>&1; then
+        echo "docker-compose is not installed. Please install docker-compose"
+        echo "Suggested fix for ubuntu:"
+        echo "sudo apt-get install docker-compose-plugin"
+        exit 1
+    else
+        echo "docker-compose is installed."
+    fi
+}
+
+# Function to wait for a Docker container to be running and log a specific string
+wait_for_container() {
+    local container_name="$1"
+    local log_string="$2"
+
+    while ! docker inspect "$container_name" &>/dev/null; do
+        echo "Waiting for container '$container_name' to be created..."
+        sleep 5
+    done
+
+    while [ "$(docker inspect -f '{{.State.Status}}' "$container_name")" != "running" ]; do
+        echo "Waiting for container '$container_name' to be running..."
+        sleep 5
+    done
+
+    # Check container logs for the specified string
+    while ! docker logs "$container_name" 2>&1 | grep "$log_string"; do
+        echo "Waiting for '$log_string' in container logs of '$container_name'..."
+        sleep 5
+    done
+}
+
+space() {
+    echo ""
+    echo "++++++++++++++++++++++++++++++++++++++++++++++++++++"
+    echo ""
+}