Merge "Storage of PM Data"
authorJohn Keeney <john.keeney@est.tech>
Wed, 1 Mar 2023 10:41:29 +0000 (10:41 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Wed, 1 Mar 2023 10:41:29 +0000 (10:41 +0000)
pmlog/api/pmlog-api.json
pmlog/api/pmlog-api.yaml
pmlog/src/main/java/org/oran/pmlog/ConsumerRegstrationTask.java
pmlog/src/main/java/org/oran/pmlog/KafkaTopicListener.java
pmlog/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java
pmlog/src/test/java/org/oran/pmlog/ApplicationTest.java
pmlog/src/test/java/org/oran/pmlog/IcsSimulatorController.java [new file with mode: 0644]
pmlog/src/test/java/org/oran/pmlog/Integration.java

index 1d2534b..560c8da 100644 (file)
@@ -8,6 +8,51 @@
     }}},
     "openapi": "3.0.1",
     "paths": {
+        "/actuator/threaddump": {"get": {
+            "summary": "Actuator web endpoint 'threaddump'",
+            "operationId": "threaddump_4",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
+        "/actuator/info": {"get": {
+            "summary": "Actuator web endpoint 'info'",
+            "operationId": "info_2",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
+        "/actuator/loggers": {"get": {
+            "summary": "Actuator web endpoint 'loggers'",
+            "operationId": "loggers_2",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
+        "/actuator/health/**": {"get": {
+            "summary": "Actuator web endpoint 'health-path'",
+            "operationId": "health-path_2",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
+        "/actuator/shutdown": {"post": {
+            "summary": "Actuator web endpoint 'shutdown'",
+            "operationId": "shutdown_2",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
         "/actuator/metrics/{requiredMetricName}": {"get": {
             "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
             "operationId": "metrics-requiredMetricName_2",
             }},
             "tags": ["Actuator"]
         }},
-        "/actuator/threaddump": {"get": {
-            "summary": "Actuator web endpoint 'threaddump'",
-            "operationId": "threaddump_4",
+        "/actuator/logfile": {"get": {
+            "summary": "Actuator web endpoint 'logfile'",
+            "operationId": "logfile_2",
             "responses": {"200": {
                 "description": "OK",
                 "content": {"*/*": {"schema": {"type": "object"}}}
             }},
             "tags": ["Actuator"]
         }},
-        "/actuator/logfile": {"get": {
-            "summary": "Actuator web endpoint 'logfile'",
-            "operationId": "logfile_2",
+        "/data-consumer/v1/info-jobs/{infoJobId}": {"put": {
+            "requestBody": {
+                "content": {"application/json": {"schema": {"type": "string"}}},
+                "required": true
+            },
+            "operationId": "putIndividualInfoJob",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {"application/json": {"schema": {"type": "object"}}}
             }},
-            "tags": ["Actuator"]
+            "parameters": [{
+                "schema": {"type": "string"},
+                "in": "path",
+                "name": "infoJobId",
+                "required": true
+            }],
+            "tags": ["Information Coordinator Service Simulator (exists only in test)"]
         }},
         "/actuator/loggers/{name}": {
             "post": {
                 "tags": ["Actuator"]
             }
         },
-        "/actuator/info": {"get": {
-            "summary": "Actuator web endpoint 'info'",
-            "operationId": "info_2",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
-            }},
-            "tags": ["Actuator"]
-        }},
         "/actuator/health": {"get": {
             "summary": "Actuator web endpoint 'health'",
             "operationId": "health_2",
             }},
             "tags": ["Actuator"]
         }},
-        "/actuator/loggers": {"get": {
-            "summary": "Actuator web endpoint 'loggers'",
-            "operationId": "loggers_2",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
-            }},
-            "tags": ["Actuator"]
-        }},
-        "/actuator/health/**": {"get": {
-            "summary": "Actuator web endpoint 'health-path'",
-            "operationId": "health-path_2",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
-            }},
-            "tags": ["Actuator"]
-        }},
         "/actuator/metrics": {"get": {
             "summary": "Actuator web endpoint 'metrics'",
             "operationId": "metrics_2",
                 "content": {"*/*": {"schema": {"type": "object"}}}
             }},
             "tags": ["Actuator"]
-        }},
-        "/actuator/shutdown": {"post": {
-            "summary": "Actuator web endpoint 'shutdown'",
-            "operationId": "shutdown_2",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
-            }},
-            "tags": ["Actuator"]
         }}
     },
     "info": {
         "title": "PM Logger",
         "version": "1.0"
     },
-    "tags": [{
-        "name": "Actuator",
-        "description": "Monitor and interact",
-        "externalDocs": {
-            "description": "Spring Boot Actuator Web API Documentation",
-            "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
+    "tags": [
+        {"name": "Information Coordinator Service Simulator (exists only in test)"},
+        {
+            "name": "Actuator",
+            "description": "Monitor and interact",
+            "externalDocs": {
+                "description": "Spring Boot Actuator Web API Documentation",
+                "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
+            }
         }
-    }]
+    ]
 }
\ No newline at end of file
index b914489..5dec51b 100644 (file)
@@ -10,12 +10,78 @@ info:
 servers:
 - url: /
 tags:
+- name: Information Coordinator Service Simulator (exists only in test)
 - name: Actuator
   description: Monitor and interact
   externalDocs:
     description: Spring Boot Actuator Web API Documentation
     url: https://docs.spring.io/spring-boot/docs/current/actuator-api/html/
 paths:
+  /actuator/threaddump:
+    get:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'threaddump'
+      operationId: threaddump_4
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
+  /actuator/info:
+    get:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'info'
+      operationId: info_2
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
+  /actuator/loggers:
+    get:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'loggers'
+      operationId: loggers_2
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
+  /actuator/health/**:
+    get:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'health-path'
+      operationId: health-path_2
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
+  /actuator/shutdown:
+    post:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'shutdown'
+      operationId: shutdown_2
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
   /actuator/metrics/{requiredMetricName}:
     get:
       tags:
@@ -54,12 +120,12 @@ paths:
                   type: object
                   additionalProperties:
                     $ref: '#/components/schemas/Link'
-  /actuator/threaddump:
+  /actuator/logfile:
     get:
       tags:
       - Actuator
-      summary: Actuator web endpoint 'threaddump'
-      operationId: threaddump_4
+      summary: Actuator web endpoint 'logfile'
+      operationId: logfile_2
       responses:
         200:
           description: OK
@@ -67,17 +133,30 @@ paths:
             '*/*':
               schema:
                 type: object
-  /actuator/logfile:
-    get:
+  /data-consumer/v1/info-jobs/{infoJobId}:
+    put:
       tags:
-      - Actuator
-      summary: Actuator web endpoint 'logfile'
-      operationId: logfile_2
+      - Information Coordinator Service Simulator (exists only in test)
+      operationId: putIndividualInfoJob
+      parameters:
+      - name: infoJobId
+        in: path
+        required: true
+        style: simple
+        explode: false
+        schema:
+          type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              type: string
+        required: true
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/json:
               schema:
                 type: object
   /actuator/loggers/{name}:
@@ -121,19 +200,6 @@ paths:
             '*/*':
               schema:
                 type: object
-  /actuator/info:
-    get:
-      tags:
-      - Actuator
-      summary: Actuator web endpoint 'info'
-      operationId: info_2
-      responses:
-        200:
-          description: OK
-          content:
-            '*/*':
-              schema:
-                type: object
   /actuator/health:
     get:
       tags:
@@ -147,32 +213,6 @@ paths:
             '*/*':
               schema:
                 type: object
-  /actuator/loggers:
-    get:
-      tags:
-      - Actuator
-      summary: Actuator web endpoint 'loggers'
-      operationId: loggers_2
-      responses:
-        200:
-          description: OK
-          content:
-            '*/*':
-              schema:
-                type: object
-  /actuator/health/**:
-    get:
-      tags:
-      - Actuator
-      summary: Actuator web endpoint 'health-path'
-      operationId: health-path_2
-      responses:
-        200:
-          description: OK
-          content:
-            '*/*':
-              schema:
-                type: object
   /actuator/metrics:
     get:
       tags:
@@ -199,19 +239,6 @@ paths:
             '*/*':
               schema:
                 type: object
-  /actuator/shutdown:
-    post:
-      tags:
-      - Actuator
-      summary: Actuator web endpoint 'shutdown'
-      operationId: shutdown_2
-      responses:
-        200:
-          description: OK
-          content:
-            '*/*':
-              schema:
-                type: object
 components:
   schemas:
     Link:
index 34954f8..573b2c3 100644 (file)
@@ -74,7 +74,7 @@ public class ConsumerRegstrationTask {
     private void createSubscription() {
         putInfoJob() //
                 .doOnError(this::handleRegistrationFailure)
-                .retryWhen(Retry.fixedDelay(100, Duration.ofMillis(30 * 1000))) //
+                .retryWhen(Retry.fixedDelay(100, Duration.ofMillis(5 * 1000))) //
                 .subscribe( //
                         null, //
                         this::handleRegistrationFailure, //
index b5d2dae..390ccae 100644 (file)
 
 package org.oran.pmlog;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.zip.GZIPInputStream;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -97,20 +94,4 @@ public class KafkaTopicListener {
                 .subscription(Collections.singleton(this.applicationConfig.getKafkaInputTopic()));
     }
 
-    public static byte[] unzip(byte[] bytes) throws IOException {
-        try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
-            return gzipInput.readAllBytes();
-        }
-    }
-
-    private static byte[] unzip(byte[] bytes, String fileName) {
-        try {
-            return fileName.endsWith(".gz") ? unzip(bytes) : bytes;
-        } catch (IOException e) {
-            logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage());
-            return new byte[0];
-        }
-
-    }
-
 }
index 8df61cd..84c5083 100644 (file)
@@ -25,7 +25,6 @@ import java.nio.charset.Charset;
 import java.nio.file.Files;
 
 import lombok.Getter;
-import lombok.Setter;
 import lombok.ToString;
 
 import org.slf4j.Logger;
@@ -69,9 +68,8 @@ public class ApplicationConfig {
     private int httpProxyPort = 0;
 
     @Getter
-    @Setter
     @Value("${server.port}")
-    private int localServerHttpPort;
+    private int localServerHttpsPort;
 
     @Getter
     @Value("${app.kafka.max-poll-records:300}")
index bbd0e88..da12e0a 100644 (file)
@@ -21,6 +21,7 @@
 package org.oran.pmlog;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.spy;
 
@@ -44,6 +45,7 @@ import org.oran.pmlog.clients.AsyncRestClient;
 import org.oran.pmlog.clients.AsyncRestClientFactory;
 import org.oran.pmlog.clients.SecurityContext;
 import org.oran.pmlog.configuration.ApplicationConfig;
+import org.oran.pmlog.configuration.ConsumerJobInfo;
 import org.oran.pmlog.configuration.WebClientConfig;
 import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
 import org.slf4j.Logger;
@@ -52,7 +54,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.boot.test.web.server.LocalServerPort;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
@@ -62,7 +63,7 @@ import org.springframework.test.context.TestPropertySource;
 import reactor.core.publisher.Flux;
 
 @TestMethodOrder(MethodOrderer.MethodName.class)
-@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
+@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
@@ -76,17 +77,25 @@ class ApplicationTest {
     @Autowired
     SecurityContext securityContext;
 
-    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+    @Autowired
+    private IcsSimulatorController icsSimulatorController;
+
+    @Autowired
+    private ConsumerRegstrationTask consumerRegstrationTask;
 
-    @LocalServerPort
-    int port;
+    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     static class TestApplicationConfig extends ApplicationConfig {
 
+        @Override
+        public String getIcsBaseUrl() {
+            return thisProcessUrl();
+        }
+
         String thisProcessUrl() {
-            final String url = "https://localhost:" + getLocalServerHttpPort();
+            final String url = "https://localhost:" + getLocalServerHttpsPort();
             return url;
         }
     }
@@ -135,7 +144,7 @@ class ApplicationTest {
 
     @AfterEach
     void reset() {
-
+        this.icsSimulatorController.testResults.reset();
     }
 
     @Test
@@ -165,12 +174,21 @@ class ApplicationTest {
         Mockito.verify(TestBeanFactory.influxStore, Mockito.times(1)).store(any(), any());
     }
 
+    @Test
+    void testJobCreation() throws Exception {
+        await().untilAsserted(() -> assertThat(consumerRegstrationTask.isRegisteredInIcs()).isTrue());
+        ConsumerJobInfo createdJob = this.icsSimulatorController.testResults.createdJob;
+        assertThat(createdJob).isNotNull();
+        assertThat(createdJob.jobDefinition.getDeliveryInfo().getTopic())
+                .isEqualTo(applicationConfig.getKafkaInputTopic());
+    }
+
     private AsyncRestClient restClient() {
         return restClient(false);
     }
 
     private String baseUrl() {
-        return "https://localhost:" + this.port;
+        return "https://localhost:" + this.applicationConfig.getLocalServerHttpsPort();
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
diff --git a/pmlog/src/test/java/org/oran/pmlog/IcsSimulatorController.java b/pmlog/src/test/java/org/oran/pmlog/IcsSimulatorController.java
new file mode 100644 (file)
index 0000000..e04655d
--- /dev/null
@@ -0,0 +1,77 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2023 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.pmlog;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import io.swagger.v3.oas.annotations.tags.Tag;
+
+import java.lang.invoke.MethodHandles;
+
+import org.oran.pmlog.configuration.ConsumerJobInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController("IcsSimulatorController")
+@Tag(name = "Information Coordinator Service Simulator (exists only in test)")
+public class IcsSimulatorController {
+
+    private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private final static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+    public static class TestResults {
+
+        ConsumerJobInfo createdJob = null;
+
+        public TestResults() {}
+
+        public synchronized void reset() {
+            createdJob = null;
+        }
+
+        public void setCreatedJob(ConsumerJobInfo informationJobObject) {
+            this.createdJob = informationJobObject;
+        }
+    }
+
+    final TestResults testResults = new TestResults();
+
+    @PutMapping(path = "/data-consumer/v1/info-jobs/{infoJobId}", //
+            produces = MediaType.APPLICATION_JSON_VALUE, //
+            consumes = MediaType.APPLICATION_JSON_VALUE)
+    public ResponseEntity<Object> putIndividualInfoJob( //
+            @PathVariable("infoJobId") String jobId, //
+            @RequestBody String body) {
+        logger.debug("*** added consumer job {}", jobId);
+        ConsumerJobInfo informationJobObject = gson.fromJson(body, ConsumerJobInfo.class);
+        testResults.setCreatedJob(informationJobObject);
+        return new ResponseEntity<>(HttpStatus.OK);
+    }
+
+}
index fa3c43b..ca1fdd2 100644 (file)
@@ -74,7 +74,7 @@ class Integration {
 
     static class TestApplicationConfig extends ApplicationConfig {
         String thisProcessUrl() {
-            final String url = "https://localhost:" + getLocalServerHttpPort();
+            final String url = "https://localhost:" + getLocalServerHttpsPort();
             return url;
         }
     }
@@ -161,11 +161,12 @@ class Integration {
 
     }
 
+    final String PM_REPORT_FILE_BIG = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
+    final String PM_REPORT_FILE = "./src/test/resources/pm_report.json";
+
     String pmReport(int sequenceValue, int noOfObjects) {
         try {
-            String path = "./src/test/resources/pm_report.json";
-            // path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
-            String str = Files.readString(Path.of(path), Charset.defaultCharset());
+            String str = Files.readString(Path.of(PM_REPORT_FILE), Charset.defaultCharset());
             PmReport report = gson.fromJson(str, PmReport.class);
             PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection();