Merge "Update documentation for DMaaP Mediator Producer"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Wed, 19 Jan 2022 11:49:10 +0000 (11:49 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Wed, 19 Jan 2022 11:49:10 +0000 (11:49 +0000)
14 files changed:
a1-policy-management-service/pom.xml
dmaap-adaptor-java/Dockerfile
dmaap-adaptor-java/pom.xml
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
information-coordinator-service/Dockerfile
onap/oran
r-app-catalogue/Dockerfile
test/usecases/oruclosedlooprecovery/scriptversion/app/Dockerfile

index b90bb88..870874d 100644 (file)
@@ -48,8 +48,7 @@
     <properties>
         <java.version>11</java.version>
         <springfox.version>3.0.0</springfox.version>
-        <immutable.version>2.8.2</immutable.version>
-        <sdk.version>1.1.6</sdk.version>
+        <immutable.version>2.8.2</immutable.version>      
         <swagger.version>2.1.12</swagger.version>
         <json.version>20190722</json.version>
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
@@ -70,7 +69,7 @@
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
-            <version>30.0-jre</version>
+            <version>31.0.1-jre</version>
         </dependency>
         <dependency>
             <groupId>org.springdoc</groupId>
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
-        <dependency>
-            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-            <artifactId>cbs-client</artifactId>
-            <version>${sdk.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
\ No newline at end of file
+</project>
index b2c0c30..f565e80 100644 (file)
@@ -30,14 +30,22 @@ WORKDIR /opt/app/dmaap-adaptor-service
 RUN mkdir -p /var/log/dmaap-adaptor-service
 RUN mkdir -p /opt/app/dmaap-adaptor-service/etc/cert/
 RUN mkdir -p /var/dmaap-adaptor-service
-RUN chmod -R 777 /var/dmaap-adaptor-service
 
 ADD /config/application.yaml /opt/app/dmaap-adaptor-service/config/application.yaml
 ADD /config/application_configuration.json /opt/app/dmaap-adaptor-service/data/application_configuration.json_example
 ADD /config/keystore.jks /opt/app/dmaap-adaptor-service/etc/cert/keystore.jks
 ADD /config/truststore.jks /opt/app/dmaap-adaptor-service/etc/cert/truststore.jks
 
-RUN chmod -R 777 /opt/app/dmaap-adaptor-service/config/
+ARG user=nonrtric
+ARG group=nonrtric
+
+RUN groupadd $user && \
+    useradd -r -g $group $user
+RUN chown -R $user:$group /opt/app/dmaap-adaptor-service
+RUN chown -R $user:$group /var/log/dmaap-adaptor-service
+RUN chown -R $user:$group /var/dmaap-adaptor-service
+
+USER ${user}
 
 ADD target/${JAR} /opt/app/dmaap-adaptor-service/dmaap-adaptor.jar
 CMD ["java", "-jar", "/opt/app/dmaap-adaptor-service/dmaap-adaptor.jar"]
index 71f2bb8..b555912 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.6.2</version>
+        <version>2.5.8</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric</groupId>
@@ -56,7 +56,6 @@
         <spotless-maven-plugin.version>1.24.3</spotless-maven-plugin.version>
         <swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version>
         <docker-maven-plugin>0.30.0</docker-maven-plugin>
-        <javax.ws.rs-api.version>2.1.1</javax.ws.rs-api.version>
         <sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
         <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
         <exec.skip>true</exec.skip>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
-            <version>1.3.7</version>
+            <version>1.3.9</version>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
\ No newline at end of file
+</project>
index 4809017..5233401 100644 (file)
@@ -100,7 +100,9 @@ public class KafkaTopicConsumers {
     public synchronized void restartNonRunningTopics() {
         for (String typeId : this.consumers.keySet()) {
             for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
-                restartTopic(consumer);
+                if (!consumer.isRunning()) {
+                    restartTopic(consumer);
+                }
             }
         }
     }
index 38e7807..f7e6e87 100644 (file)
@@ -6,16 +6,20 @@
       "type": "string"
     },
     "maxConcurrency": {
-      "type": "integer"
+      "type": "integer",
+      "minimum": 1
     },
     "bufferTimeout": {
       "type": "object",
       "properties": {
         "maxSize": {
-          "type": "integer"
+          "type": "integer",
+          "minimum": 1
         },
         "maxTimeMiliseconds": {
-          "type": "integer"
+          "type": "integer",
+          "minimum": 0,
+          "maximum": 160000
         }
       },
       "additionalProperties": false,
index 8c41423..6660175 100644 (file)
@@ -333,11 +333,11 @@ class ApplicationTest {
                 () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
     }
 
-    private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
+    public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
         testErrorCode(request, expStatus, responseContains, true);
     }
 
-    private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
+    public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
             boolean expectApplicationProblemJsonMediaType) {
         StepVerifier.create(request) //
                 .expectSubscription() //
@@ -346,7 +346,7 @@ class ApplicationTest {
                 .verify();
     }
 
-    private boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
+    private static boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
             boolean expectApplicationProblemJsonMediaType) {
         assertTrue(throwable instanceof WebClientResponseException);
         WebClientResponseException responseException = (WebClientResponseException) throwable;
index d1d7e91..9f0ef19 100644 (file)
@@ -49,9 +49,11 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpStatus;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
+
 @SuppressWarnings("java:S3577") // Rename class
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@@ -232,7 +234,23 @@ class IntegrationWithIcs {
     }
 
     @Test
-    void testWholeChain() throws Exception {
+    void testKafkaJobParameterOutOfRange() {
+        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
+        final String TYPE_ID = "KafkaInformationType";
+
+        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1);
+
+        ConsumerJobInfo jobInfo =
+                new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+        String body = gson.toJson(jobInfo);
+
+        ApplicationTest.testErrorCode(restClient().put(jobUrl("KAFKA_JOB_ID"), body), HttpStatus.BAD_REQUEST,
+                "Json validation failure");
+
+    }
+
+    @Test
+    void testDmaapMessage() throws Exception {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
 
         createInformationJobInIcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
@@ -250,7 +268,6 @@ class IntegrationWithIcs {
         deleteInformationJobInIcs(DMAAP_JOB_ID);
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-
     }
 
 }
index c38af8a..5a48d61 100644 (file)
@@ -215,7 +215,7 @@ class IntegrationWithKafka {
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -236,6 +236,8 @@ class IntegrationWithKafka {
                 .doOnError(e -> logger.error("Send failed", e)) //
                 .blockLast();
 
+        sender.close();
+
     }
 
     private void verifiedReceivedByConsumer(String... strings) {
@@ -246,6 +248,29 @@ class IntegrationWithKafka {
         }
     }
 
+    @Test
+    void simpleCase() throws InterruptedException {
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        Thread.sleep(4000);
+        var dataToSend = Flux.just(senderRecord("Message"));
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumer("Message");
+
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+    }
+
     @Test
     void kafkaIntegrationTest() throws Exception {
         final String JOB_ID1 = "ID1";
@@ -256,12 +281,13 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
-        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+        this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
                 restClient());
         this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
+        Thread.sleep(2000);
         var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);
 
index 0bf2f12..c84e277 100644 (file)
@@ -399,15 +399,30 @@ func (j *job) read(bufferParams BufferTimeout) []byte {
 }
 
 func getAsJSONArray(rawMsgs [][]byte) []byte {
-       json := `"[`
+       if len(rawMsgs) == 0 {
+               return []byte("")
+       }
+       strings := ""
        for i := 0; i < len(rawMsgs); i++ {
-               msg := string(rawMsgs[i])
-               json = json + strings.ReplaceAll(msg, "\"", "\\\"")
-               if i < len(rawMsgs)-1 {
-                       json = json + ","
-               }
+               strings = strings + makeIntoString(rawMsgs[i])
+               strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
        }
-       return []byte(json + `]"`)
+       return []byte(wrapInJSONArray(strings))
+}
+
+func makeIntoString(rawMsg []byte) string {
+       return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
+}
+
+func addSeparatorIfNeeded(strings string, position, length int) string {
+       if position < length-1 {
+               strings = strings + ","
+       }
+       return strings
+}
+
+func wrapInJSONArray(strings string) string {
+       return "[" + strings + "]"
 }
 
 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
index 2dc700b..7d02104 100644 (file)
@@ -388,7 +388,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t))
+                       assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
@@ -418,6 +418,7 @@ func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
        go func() {
                jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
                jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
+               jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
        }()
 
        if waitTimeout(&wg, 2*time.Second) {
@@ -442,7 +443,7 @@ func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t
                MaxTimeMiliseconds: 200,
        })
 
-       assertions.Equal([]byte("\"[0,1]\""), msgs)
+       assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
 }
 func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
        assertions := require.New(t)
@@ -461,7 +462,7 @@ func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t
                MaxTimeMiliseconds: 30,
        })
 
-       assertions.Equal([]byte("\"[0,1]\""), msgs)
+       assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
 }
 
 func fillMessagesBuffer(mc chan []byte) {
index e9d179d..cc8813e 100644 (file)
@@ -25,7 +25,6 @@ WORKDIR /opt/app/information-coordinator-service
 RUN mkdir -p /var/log/information-coordinator-service
 RUN mkdir -p /opt/app/information-coordinator-service/etc/cert/
 RUN mkdir -p /var/information-coordinator-service
-RUN chmod -R 777 /var/information-coordinator-service
 
 EXPOSE 8083 8434
 
@@ -34,8 +33,16 @@ ADD target/${JAR} /opt/app/information-coordinator-service/information-coordinat
 ADD /config/keystore.jks /opt/app/information-coordinator-service/etc/cert/keystore.jks
 ADD /config/truststore.jks /opt/app/information-coordinator-service/etc/cert/truststore.jks
 
+ARG user=nonrtric
+ARG group=nonrtric
 
-RUN chmod -R 777 /opt/app/information-coordinator-service/config/
+RUN groupadd $user && \
+    useradd -r -g $group $user
+RUN chown -R $user:$group /opt/app/information-coordinator-service
+RUN chown -R $user:$group /var/log/information-coordinator-service
+RUN chown -R $user:$group /var/information-coordinator-service
+
+USER ${user}
 
 CMD ["java", "-jar", "/opt/app/information-coordinator-service/information-coordinator-service.jar"]
 
index c6cca20..0f8b205 160000 (submodule)
--- a/onap/oran
+++ b/onap/oran
@@ -1 +1 @@
-Subproject commit c6cca20e3fd10842be0ca782d143dfa59d5c086e
+Subproject commit 0f8b20544745afaf9c7b38140b9516667d9c4752
index cd2efc9..ed4be95 100644 (file)
@@ -31,8 +31,15 @@ ADD /config/application.yaml /opt/app/r-app-catalogue/config/application.yaml
 ADD /config/r-app-catalogue-keystore.jks /opt/app/r-app-catalogue/etc/cert/keystore.jks
 ADD target/${JAR} /opt/app/r-app-catalogue/r-app-catalogue.jar
 
+ARG user=nonrtric
+ARG group=nonrtric
 
-RUN chmod -R 777 /opt/app/r-app-catalogue/config/
+RUN groupadd $user && \
+    useradd -r -g $group $user
+RUN chown -R $user:$group /opt/app/r-app-catalogue
+RUN chown -R $user:$group /var/log/r-app-catalogue
+
+USER ${user}
 
 CMD ["java", "-jar", "/opt/app/r-app-catalogue/r-app-catalogue.jar"]
 
index 4cb03c7..21b24b1 100644 (file)
@@ -29,4 +29,13 @@ RUN apt-get install iputils-ping -y
 
 RUN pip install -r requirements.txt
 
+ARG user=nonrtric
+ARG group=nonrtric
+
+RUN groupadd $user && \
+    useradd -r -g $group $user
+RUN chown -R $user:$group /usr/src/app/
+
+USER ${user}
+
 CMD [ "python3", "-u", "main.py" ]