Stepping to springboot 3 29/10629/1
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 6 Feb 2023 08:20:50 +0000 (09:20 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 1 Mar 2023 12:23:06 +0000 (13:23 +0100)
Stepping to java 17.

Added an end marker "{}" when getting historical PM data.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-841
Change-Id: I0dddc42b775b7f6126690760ec97e6fa403a8d25

Dockerfile
api/api.json
api/api.yaml
pom.xml
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index ac70e36..9af0345 100644 (file)
@@ -20,7 +20,7 @@
 # ============LICENSE_END=========================================================
 
 
-FROM openjdk:11-jre-slim
+FROM openjdk:17-jdk-slim
 
 EXPOSE 8084 8435
 
index 6ba7582..a807d1a 100644 (file)
     "paths": {
         "/actuator/threaddump": {"get": {
             "summary": "Actuator web endpoint 'threaddump'",
-            "operationId": "threaddump_4",
+            "operationId": "threaddump",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "text/plain;charset=UTF-8": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
         "/actuator/info": {"get": {
             "summary": "Actuator web endpoint 'info'",
-            "operationId": "info_2",
+            "operationId": "info",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
         },
         "/actuator/loggers": {"get": {
             "summary": "Actuator web endpoint 'loggers'",
-            "operationId": "loggers_2",
+            "operationId": "loggers",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
         "/actuator/health/**": {"get": {
             "summary": "Actuator web endpoint 'health-path'",
-            "operationId": "health-path_2",
+            "operationId": "health-path",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
         "/actuator/shutdown": {"post": {
             "summary": "Actuator web endpoint 'shutdown'",
-            "operationId": "shutdown_2",
+            "operationId": "shutdown",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
                 "tags": ["Information Coordinator Service Simulator (exists only in test)"]
             }
         },
-        "/generic_dataproducer/info_job/{infoJobId}": {"delete": {
-            "summary": "Callback for Information Job deletion",
-            "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
-            "operationId": "jobDeletedCallback",
+        "/actuator/metrics/{requiredMetricName}": {"get": {
+            "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
+            "operationId": "metrics-requiredMetricName",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "parameters": [{
                 "schema": {"type": "string"},
                 "in": "path",
-                "name": "infoJobId",
+                "name": "requiredMetricName",
                 "required": true
             }],
-            "tags": ["Producer job control API"]
+            "tags": ["Actuator"]
         }},
-        "/actuator/metrics/{requiredMetricName}": {"get": {
-            "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
-            "operationId": "metrics-requiredMetricName_2",
+        "/generic_dataproducer/info_job/{infoJobId}": {"delete": {
+            "summary": "Callback for Information Job deletion",
+            "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
+            "operationId": "jobDeletedCallback",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
             }},
             "parameters": [{
                 "schema": {"type": "string"},
                 "in": "path",
-                "name": "requiredMetricName",
+                "name": "infoJobId",
                 "required": true
             }],
-            "tags": ["Actuator"]
+            "tags": ["Producer job control API"]
         }},
         "/actuator": {"get": {
             "summary": "Actuator root web endpoint",
-            "operationId": "links_1",
+            "operationId": "links",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {
-                    "additionalProperties": {
-                        "additionalProperties": {"$ref": "#/components/schemas/Link"},
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {
+                        "additionalProperties": {
+                            "additionalProperties": {"$ref": "#/components/schemas/Link"},
+                            "type": "object"
+                        },
                         "type": "object"
-                    },
-                    "type": "object"
-                }}}
+                    }},
+                    "application/json": {"schema": {
+                        "additionalProperties": {
+                            "additionalProperties": {"$ref": "#/components/schemas/Link"},
+                            "type": "object"
+                        },
+                        "type": "object"
+                    }},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {
+                        "additionalProperties": {
+                            "additionalProperties": {"$ref": "#/components/schemas/Link"},
+                            "type": "object"
+                        },
+                        "type": "object"
+                    }}
+                }
             }},
             "tags": ["Actuator"]
         }},
         "/actuator/logfile": {"get": {
             "summary": "Actuator web endpoint 'logfile'",
-            "operationId": "logfile_2",
+            "operationId": "logfile",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {"text/plain;charset=UTF-8": {"schema": {"type": "object"}}}
             }},
             "tags": ["Actuator"]
         }},
         "/actuator/loggers/{name}": {
             "post": {
                 "summary": "Actuator web endpoint 'loggers-name'",
-                "operationId": "loggers-name_3",
+                "requestBody": {"content": {"application/json": {"schema": {
+                    "type": "string",
+                    "enum": [
+                        "TRACE",
+                        "DEBUG",
+                        "INFO",
+                        "WARN",
+                        "ERROR",
+                        "FATAL",
+                        "OFF"
+                    ]
+                }}}},
+                "operationId": "loggers-name_2",
                 "responses": {"200": {
                     "description": "OK",
                     "content": {"*/*": {"schema": {"type": "object"}}}
             },
             "get": {
                 "summary": "Actuator web endpoint 'loggers-name'",
-                "operationId": "loggers-name_4",
+                "operationId": "loggers-name",
                 "responses": {"200": {
                     "description": "OK",
-                    "content": {"*/*": {"schema": {"type": "object"}}}
+                    "content": {
+                        "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                        "application/json": {"schema": {"type": "object"}},
+                        "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                    }
                 }},
                 "parameters": [{
                     "schema": {"type": "string"},
         },
         "/actuator/health": {"get": {
             "summary": "Actuator web endpoint 'health'",
-            "operationId": "health_2",
+            "operationId": "health",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
         }},
         "/actuator/metrics": {"get": {
             "summary": "Actuator web endpoint 'metrics'",
-            "operationId": "metrics_2",
+            "operationId": "metrics",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {
+                    "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+                    "application/json": {"schema": {"type": "object"}},
+                    "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+                }
             }},
             "tags": ["Actuator"]
         }},
         "/actuator/heapdump": {"get": {
             "summary": "Actuator web endpoint 'heapdump'",
-            "operationId": "heapdump_2",
+            "operationId": "heapdump",
             "responses": {"200": {
                 "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
+                "content": {"application/octet-stream": {"schema": {"type": "object"}}}
             }},
             "tags": ["Actuator"]
         }}
         "title": "Generic Dmaap and Kafka Information Producer",
         "version": "1.0"
     },
-    "tags": [
-        {"name": "Information Coordinator Service Simulator (exists only in test)"},
-        {"name": "Producer job control API"},
-        {"name": "Test Consumer Simulator (exists only in test)"},
-        {"name": "DMAAP Simulator (exists only in test)"},
-        {
-            "name": "Actuator",
-            "description": "Monitor and interact",
-            "externalDocs": {
-                "description": "Spring Boot Actuator Web API Documentation",
-                "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
-            }
+    "tags": [{
+        "name": "Actuator",
+        "description": "Monitor and interact",
+        "externalDocs": {
+            "description": "Spring Boot Actuator Web API Documentation",
+            "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
         }
-    ]
+    }]
 }
\ No newline at end of file
index 024b43f..15883a6 100644 (file)
@@ -10,10 +10,6 @@ info:
 servers:
 - url: /
 tags:
-- name: Information Coordinator Service Simulator (exists only in test)
-- name: Producer job control API
-- name: Test Consumer Simulator (exists only in test)
-- name: DMAAP Simulator (exists only in test)
 - name: Actuator
   description: Monitor and interact
   externalDocs:
@@ -25,12 +21,21 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'threaddump'
-      operationId: threaddump_4
+      operationId: threaddump
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            text/plain;charset=UTF-8:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /actuator/info:
@@ -38,12 +43,18 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'info'
-      operationId: info_2
+      operationId: info
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /data-producer/v1/info-types/{infoTypeId}:
@@ -153,12 +164,18 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'loggers'
-      operationId: loggers_2
+      operationId: loggers
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /actuator/health/**:
@@ -166,12 +183,18 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'health-path'
-      operationId: health-path_2
+      operationId: health-path
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /actuator/shutdown:
@@ -179,12 +202,18 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'shutdown'
-      operationId: shutdown_2
+      operationId: shutdown
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /data-producer/v1/info-producers/{infoProducerId}:
@@ -232,16 +261,14 @@ paths:
             application/json:
               schema:
                 type: object
-  /generic_dataproducer/info_job/{infoJobId}:
-    delete:
+  /actuator/metrics/{requiredMetricName}:
+    get:
       tags:
-      - Producer job control API
-      summary: Callback for Information Job deletion
-      description: The call is invoked to terminate a data subscription. The endpoint
-        is provided by the Information Producer.
-      operationId: jobDeletedCallback
+      - Actuator
+      summary: Actuator web endpoint 'metrics-requiredMetricName'
+      operationId: metrics-requiredMetricName
       parameters:
-      - name: infoJobId
+      - name: requiredMetricName
         in: path
         required: true
         style: simple
@@ -252,17 +279,25 @@ paths:
         200:
           description: OK
           content:
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
             application/json:
               schema:
-                $ref: '#/components/schemas/void'
-  /actuator/metrics/{requiredMetricName}:
-    get:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
+              schema:
+                type: object
+  /generic_dataproducer/info_job/{infoJobId}:
+    delete:
       tags:
-      - Actuator
-      summary: Actuator web endpoint 'metrics-requiredMetricName'
-      operationId: metrics-requiredMetricName_2
+      - Producer job control API
+      summary: Callback for Information Job deletion
+      description: The call is invoked to terminate a data subscription. The endpoint
+        is provided by the Information Producer.
+      operationId: jobDeletedCallback
       parameters:
-      - name: requiredMetricName
+      - name: infoJobId
         in: path
         required: true
         style: simple
@@ -273,20 +308,34 @@ paths:
         200:
           description: OK
           content:
-            '*/*':
+            application/json:
               schema:
-                type: object
+                $ref: '#/components/schemas/void'
   /actuator:
     get:
       tags:
       - Actuator
       summary: Actuator root web endpoint
-      operationId: links_1
+      operationId: links
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+                additionalProperties:
+                  type: object
+                  additionalProperties:
+                    $ref: '#/components/schemas/Link'
+            application/json:
+              schema:
+                type: object
+                additionalProperties:
+                  type: object
+                  additionalProperties:
+                    $ref: '#/components/schemas/Link'
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
                 additionalProperties:
@@ -298,12 +347,12 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'logfile'
-      operationId: logfile_2
+      operationId: logfile
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            text/plain;charset=UTF-8:
               schema:
                 type: object
   /data-consumer/v1/info-jobs/{infoJobId}:
@@ -337,7 +386,7 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'loggers-name'
-      operationId: loggers-name_4
+      operationId: loggers-name
       parameters:
       - name: name
         in: path
@@ -350,14 +399,20 @@ paths:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
     post:
       tags:
       - Actuator
       summary: Actuator web endpoint 'loggers-name'
-      operationId: loggers-name_3
+      operationId: loggers-name_2
       parameters:
       - name: name
         in: path
@@ -366,6 +421,19 @@ paths:
         explode: false
         schema:
           type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              type: string
+              enum:
+              - TRACE
+              - DEBUG
+              - INFO
+              - WARN
+              - ERROR
+              - FATAL
+              - OFF
       responses:
         200:
           description: OK
@@ -378,12 +446,18 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'health'
-      operationId: health_2
+      operationId: health
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /dmaap-topic-2:
@@ -441,12 +515,18 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'metrics'
-      operationId: metrics_2
+      operationId: metrics
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/vnd.spring-boot.actuator.v3+json:
+              schema:
+                type: object
+            application/json:
+              schema:
+                type: object
+            application/vnd.spring-boot.actuator.v2+json:
               schema:
                 type: object
   /actuator/heapdump:
@@ -454,12 +534,12 @@ paths:
       tags:
       - Actuator
       summary: Actuator web endpoint 'heapdump'
-      operationId: heapdump_2
+      operationId: heapdump
       responses:
         200:
           description: OK
           content:
-            '*/*':
+            application/octet-stream:
               schema:
                 type: object
 components:
diff --git a/pom.xml b/pom.xml
index 93c33f0..fa38cdf 100644 (file)
--- a/pom.xml
+++ b/pom.xml
 <project
     xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.7.8</version>
+        <version>3.0.3</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric.plt</groupId>
         </repository>
     </repositories>
     <properties>
-        <java.version>11</java.version>
-        <springfox.version>3.0.0</springfox.version>
+        <java.version>17</java.version>
         <gson.version>2.9.0</gson.version>
-        <swagger.version>2.2.1</swagger.version>
         <json.version>20211205</json.version>
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
         <formatter-maven-plugin.version>2.12.2</formatter-maven-plugin.version>
         <exec.skip>true</exec.skip>
         <protobuf.version>4.0.0-rc-2</protobuf.version>
         <protobuf-java-format.version>1.4</protobuf-java-format.version>
+        <springdoc.version>2.0.2</springdoc.version>
     </properties>
     <dependencies>
+        <dependency>
+            <groupId>org.springdoc</groupId>
+            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
+            <version>${springdoc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springdoc</groupId>
+            <artifactId>springdoc-openapi-ui</artifactId>
+            <version>${springdoc.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
             <groupId>org.springframework</groupId>
             <artifactId>spring-webflux</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.swagger.core.v3</groupId>
-            <artifactId>swagger-jaxrs2</artifactId>
-            <version>${swagger.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.swagger.core.v3</groupId>
-            <artifactId>swagger-jaxrs2-servlet-initializer</artifactId>
-            <version>${swagger.version}</version>
-        </dependency>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
-        <!--REQUIRED TO GENERATE DOCUMENTATION -->
-        <dependency>
-            <groupId>io.springfox</groupId>
-            <artifactId>springfox-swagger2</artifactId>
-            <version>${springfox.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.springfox</groupId>
-            <artifactId>springfox-swagger-ui</artifactId>
-            <version>${springfox.version}</version>
-        </dependency>
         <!-- For development help -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
index b235bdc..db8a272 100644 (file)
@@ -37,7 +37,6 @@ import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.FilterFactory;
-import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index ef493a5..5e150dd 100644 (file)
@@ -119,10 +119,18 @@ public abstract class JobDataDistributor {
                     .map(this::gzip) //
                     .flatMap(this::sendToClient, 1) //
                     .onErrorResume(this::handleCollectHistoricalDataError) //
+                    .doFinally(sig -> sendLastStoredRecord()) //
                     .subscribe();
         }
     }
 
+    private void sendLastStoredRecord() {
+        String data = "{}";
+        Filter.FilteredData output = new Filter.FilteredData(this.jobGroup.getType().getId(), null, data.getBytes());
+
+        sendToClient(output).subscribe();
+    }
+
     private static PmReportFilter getPmReportFilter(JobGroup jobGroup) {
 
         if (jobGroup instanceof JobGroupPm) {
index da64e0c..4d8e701 100644 (file)
@@ -69,8 +69,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.web.server.LocalServerPort;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
-import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.http.HttpStatus;
@@ -395,6 +395,7 @@ class ApplicationTest {
                 Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
 
         String paramJson = gson.toJson(param);
+        System.out.println(paramJson);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
@@ -450,7 +451,9 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
-        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
+
+        assertThat(consumer.receivedBodies.get(1)).isEqualTo("{}"); // End marker
     }
 
     @Test
index 4146212..67c9aa5 100644 (file)
@@ -294,16 +294,18 @@ class IntegrationWithIcs {
     @Test
     void testPmFilter() throws Exception {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
-        final String TYPE_ID = "PmDataOverRest";
+        final String TYPE_ID = "KafkaInformationType";
 
-        String jsonStr = pmJobParameters();
+        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
 
-        ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), "");
+        ConsumerJobInfo jobInfo = IntegrationWithKafka
+                .consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), TYPE_ID, filterData);
 
         createInformationJobInIcs(DMAAP_JOB_ID, jobInfo);
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         deleteInformationJobInIcs(DMAAP_JOB_ID);
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
     }
 }
index 12bec80..b9767f5 100644 (file)
@@ -72,8 +72,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.web.server.LocalServerPort;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
-import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
@@ -97,7 +97,7 @@ import reactor.kafka.sender.SenderRecord;
 class IntegrationWithKafka {
 
     final String KAFKA_TYPE_ID = "KafkaInformationType";
-    final String PM_TYPE_ID = "PmDataOverKafka";
+    final static String PM_TYPE_ID = "PmDataOverKafka";
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -334,11 +334,12 @@ class IntegrationWithKafka {
         }
     }
 
-    ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+    public static ConsumerJobInfo consumerJobInfoKafka(String kafkaBootstrapServers, String topic,
+            PmReportFilter.FilterData filterData) {
         try {
             Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
                     .topic(topic) //
-                    .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+                    .bootStrapServers(kafkaBootstrapServers) //
                     .build();
             Job.Parameters param = Job.Parameters.builder() //
                     .filter(filterData) //
@@ -592,10 +593,10 @@ class IntegrationWithKafka {
 
         filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
 
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
-                restClient());
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
-                restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
+                kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
+                kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
@@ -648,8 +649,9 @@ class IntegrationWithKafka {
         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
         for (int i = 0; i < NO_OF_JOBS; ++i) {
             final String outputTopic = "manyJobs_" + i;
-            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
-                    restClient());
+            this.icsSimulatorController.addJob(
+                    consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
+                    outputTopic, restClient());
             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
             receivers.add(receiver);
         }
@@ -710,7 +712,9 @@ class IntegrationWithKafka {
             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
 
-            this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
+            this.icsSimulatorController.addJob(
+                    consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
+                    jobId, restClient());
 
             KafkaReceiver receiver =
                     new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
@@ -793,8 +797,8 @@ class IntegrationWithKafka {
 
         filterData.setPmRopEndTime(OffsetDateTime.now().toString());
 
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
-                restClient());
+        this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
+                kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());