Stepping to java 17.
Added an end marker "{}" when getting historical PM data.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-841
Change-Id: I0dddc42b775b7f6126690760ec97e6fa403a8d25
# ============LICENSE_END=========================================================
-FROM openjdk:11-jre-slim
+FROM openjdk:17-jdk-slim
EXPOSE 8084 8435
"paths": {
"/actuator/threaddump": {"get": {
"summary": "Actuator web endpoint 'threaddump'",
- "operationId": "threaddump_4",
+ "operationId": "threaddump",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "text/plain;charset=UTF-8": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
"/actuator/info": {"get": {
"summary": "Actuator web endpoint 'info'",
- "operationId": "info_2",
+ "operationId": "info",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
},
"/actuator/loggers": {"get": {
"summary": "Actuator web endpoint 'loggers'",
- "operationId": "loggers_2",
+ "operationId": "loggers",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
"/actuator/health/**": {"get": {
"summary": "Actuator web endpoint 'health-path'",
- "operationId": "health-path_2",
+ "operationId": "health-path",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
"/actuator/shutdown": {"post": {
"summary": "Actuator web endpoint 'shutdown'",
- "operationId": "shutdown_2",
+ "operationId": "shutdown",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
"tags": ["Information Coordinator Service Simulator (exists only in test)"]
}
},
- "/generic_dataproducer/info_job/{infoJobId}": {"delete": {
- "summary": "Callback for Information Job deletion",
- "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
- "operationId": "jobDeletedCallback",
+ "/actuator/metrics/{requiredMetricName}": {"get": {
+ "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
+ "operationId": "metrics-requiredMetricName",
"responses": {"200": {
"description": "OK",
- "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"parameters": [{
"schema": {"type": "string"},
"in": "path",
- "name": "infoJobId",
+ "name": "requiredMetricName",
"required": true
}],
- "tags": ["Producer job control API"]
+ "tags": ["Actuator"]
}},
- "/actuator/metrics/{requiredMetricName}": {"get": {
- "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
- "operationId": "metrics-requiredMetricName_2",
+ "/generic_dataproducer/info_job/{infoJobId}": {"delete": {
+ "summary": "Callback for Information Job deletion",
+ "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
+ "operationId": "jobDeletedCallback",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
}},
"parameters": [{
"schema": {"type": "string"},
"in": "path",
- "name": "requiredMetricName",
+ "name": "infoJobId",
"required": true
}],
- "tags": ["Actuator"]
+ "tags": ["Producer job control API"]
}},
"/actuator": {"get": {
"summary": "Actuator root web endpoint",
- "operationId": "links_1",
+ "operationId": "links",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {
- "additionalProperties": {
- "additionalProperties": {"$ref": "#/components/schemas/Link"},
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {
+ "additionalProperties": {
+ "additionalProperties": {"$ref": "#/components/schemas/Link"},
+ "type": "object"
+ },
"type": "object"
- },
- "type": "object"
- }}}
+ }},
+ "application/json": {"schema": {
+ "additionalProperties": {
+ "additionalProperties": {"$ref": "#/components/schemas/Link"},
+ "type": "object"
+ },
+ "type": "object"
+ }},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {
+ "additionalProperties": {
+ "additionalProperties": {"$ref": "#/components/schemas/Link"},
+ "type": "object"
+ },
+ "type": "object"
+ }}
+ }
}},
"tags": ["Actuator"]
}},
"/actuator/logfile": {"get": {
"summary": "Actuator web endpoint 'logfile'",
- "operationId": "logfile_2",
+ "operationId": "logfile",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {"text/plain;charset=UTF-8": {"schema": {"type": "object"}}}
}},
"tags": ["Actuator"]
}},
"/actuator/loggers/{name}": {
"post": {
"summary": "Actuator web endpoint 'loggers-name'",
- "operationId": "loggers-name_3",
+ "requestBody": {"content": {"application/json": {"schema": {
+ "type": "string",
+ "enum": [
+ "TRACE",
+ "DEBUG",
+ "INFO",
+ "WARN",
+ "ERROR",
+ "FATAL",
+ "OFF"
+ ]
+ }}}},
+ "operationId": "loggers-name_2",
"responses": {"200": {
"description": "OK",
"content": {"*/*": {"schema": {"type": "object"}}}
},
"get": {
"summary": "Actuator web endpoint 'loggers-name'",
- "operationId": "loggers-name_4",
+ "operationId": "loggers-name",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"parameters": [{
"schema": {"type": "string"},
},
"/actuator/health": {"get": {
"summary": "Actuator web endpoint 'health'",
- "operationId": "health_2",
+ "operationId": "health",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
}},
"/actuator/metrics": {"get": {
"summary": "Actuator web endpoint 'metrics'",
- "operationId": "metrics_2",
+ "operationId": "metrics",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {
+ "application/vnd.spring-boot.actuator.v3+json": {"schema": {"type": "object"}},
+ "application/json": {"schema": {"type": "object"}},
+ "application/vnd.spring-boot.actuator.v2+json": {"schema": {"type": "object"}}
+ }
}},
"tags": ["Actuator"]
}},
"/actuator/heapdump": {"get": {
"summary": "Actuator web endpoint 'heapdump'",
- "operationId": "heapdump_2",
+ "operationId": "heapdump",
"responses": {"200": {
"description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
+ "content": {"application/octet-stream": {"schema": {"type": "object"}}}
}},
"tags": ["Actuator"]
}}
"title": "Generic Dmaap and Kafka Information Producer",
"version": "1.0"
},
- "tags": [
- {"name": "Information Coordinator Service Simulator (exists only in test)"},
- {"name": "Producer job control API"},
- {"name": "Test Consumer Simulator (exists only in test)"},
- {"name": "DMAAP Simulator (exists only in test)"},
- {
- "name": "Actuator",
- "description": "Monitor and interact",
- "externalDocs": {
- "description": "Spring Boot Actuator Web API Documentation",
- "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
- }
+ "tags": [{
+ "name": "Actuator",
+ "description": "Monitor and interact",
+ "externalDocs": {
+ "description": "Spring Boot Actuator Web API Documentation",
+ "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
}
- ]
+ }]
}
\ No newline at end of file
servers:
- url: /
tags:
-- name: Information Coordinator Service Simulator (exists only in test)
-- name: Producer job control API
-- name: Test Consumer Simulator (exists only in test)
-- name: DMAAP Simulator (exists only in test)
- name: Actuator
description: Monitor and interact
externalDocs:
tags:
- Actuator
summary: Actuator web endpoint 'threaddump'
- operationId: threaddump_4
+ operationId: threaddump
responses:
200:
description: OK
content:
- '*/*':
+ text/plain;charset=UTF-8:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/actuator/info:
tags:
- Actuator
summary: Actuator web endpoint 'info'
- operationId: info_2
+ operationId: info
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/data-producer/v1/info-types/{infoTypeId}:
tags:
- Actuator
summary: Actuator web endpoint 'loggers'
- operationId: loggers_2
+ operationId: loggers
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/actuator/health/**:
tags:
- Actuator
summary: Actuator web endpoint 'health-path'
- operationId: health-path_2
+ operationId: health-path
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/actuator/shutdown:
tags:
- Actuator
summary: Actuator web endpoint 'shutdown'
- operationId: shutdown_2
+ operationId: shutdown
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/data-producer/v1/info-producers/{infoProducerId}:
application/json:
schema:
type: object
- /generic_dataproducer/info_job/{infoJobId}:
- delete:
+ /actuator/metrics/{requiredMetricName}:
+ get:
tags:
- - Producer job control API
- summary: Callback for Information Job deletion
- description: The call is invoked to terminate a data subscription. The endpoint
- is provided by the Information Producer.
- operationId: jobDeletedCallback
+ - Actuator
+ summary: Actuator web endpoint 'metrics-requiredMetricName'
+ operationId: metrics-requiredMetricName
parameters:
- - name: infoJobId
+ - name: requiredMetricName
in: path
required: true
style: simple
200:
description: OK
content:
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
application/json:
schema:
- $ref: '#/components/schemas/void'
- /actuator/metrics/{requiredMetricName}:
- get:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
+ schema:
+ type: object
+ /generic_dataproducer/info_job/{infoJobId}:
+ delete:
tags:
- - Actuator
- summary: Actuator web endpoint 'metrics-requiredMetricName'
- operationId: metrics-requiredMetricName_2
+ - Producer job control API
+ summary: Callback for Information Job deletion
+ description: The call is invoked to terminate a data subscription. The endpoint
+ is provided by the Information Producer.
+ operationId: jobDeletedCallback
parameters:
- - name: requiredMetricName
+ - name: infoJobId
in: path
required: true
style: simple
200:
description: OK
content:
- '*/*':
+ application/json:
schema:
- type: object
+ $ref: '#/components/schemas/void'
/actuator:
get:
tags:
- Actuator
summary: Actuator root web endpoint
- operationId: links_1
+ operationId: links
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ additionalProperties:
+ type: object
+ additionalProperties:
+ $ref: '#/components/schemas/Link'
+ application/json:
+ schema:
+ type: object
+ additionalProperties:
+ type: object
+ additionalProperties:
+ $ref: '#/components/schemas/Link'
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
additionalProperties:
tags:
- Actuator
summary: Actuator web endpoint 'logfile'
- operationId: logfile_2
+ operationId: logfile
responses:
200:
description: OK
content:
- '*/*':
+ text/plain;charset=UTF-8:
schema:
type: object
/data-consumer/v1/info-jobs/{infoJobId}:
tags:
- Actuator
summary: Actuator web endpoint 'loggers-name'
- operationId: loggers-name_4
+ operationId: loggers-name
parameters:
- name: name
in: path
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
post:
tags:
- Actuator
summary: Actuator web endpoint 'loggers-name'
- operationId: loggers-name_3
+ operationId: loggers-name_2
parameters:
- name: name
in: path
explode: false
schema:
type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ type: string
+ enum:
+ - TRACE
+ - DEBUG
+ - INFO
+ - WARN
+ - ERROR
+ - FATAL
+ - OFF
responses:
200:
description: OK
tags:
- Actuator
summary: Actuator web endpoint 'health'
- operationId: health_2
+ operationId: health
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/dmaap-topic-2:
tags:
- Actuator
summary: Actuator web endpoint 'metrics'
- operationId: metrics_2
+ operationId: metrics
responses:
200:
description: OK
content:
- '*/*':
+ application/vnd.spring-boot.actuator.v3+json:
+ schema:
+ type: object
+ application/json:
+ schema:
+ type: object
+ application/vnd.spring-boot.actuator.v2+json:
schema:
type: object
/actuator/heapdump:
tags:
- Actuator
summary: Actuator web endpoint 'heapdump'
- operationId: heapdump_2
+ operationId: heapdump
responses:
200:
description: OK
content:
- '*/*':
+ application/octet-stream:
schema:
type: object
components:
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.8</version>
+ <version>3.0.3</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric.plt</groupId>
</repository>
</repositories>
<properties>
- <java.version>11</java.version>
- <springfox.version>3.0.0</springfox.version>
+ <java.version>17</java.version>
<gson.version>2.9.0</gson.version>
- <swagger.version>2.2.1</swagger.version>
<json.version>20211205</json.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<formatter-maven-plugin.version>2.12.2</formatter-maven-plugin.version>
<exec.skip>true</exec.skip>
<protobuf.version>4.0.0-rc-2</protobuf.version>
<protobuf-java-format.version>1.4</protobuf-java-format.version>
+ <springdoc.version>2.0.2</springdoc.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.springdoc</groupId>
+ <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
+ <version>${springdoc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springdoc</groupId>
+ <artifactId>springdoc-openapi-ui</artifactId>
+ <version>${springdoc.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
- <dependency>
- <groupId>io.swagger.core.v3</groupId>
- <artifactId>swagger-jaxrs2</artifactId>
- <version>${swagger.version}</version>
- </dependency>
- <dependency>
- <groupId>io.swagger.core.v3</groupId>
- <artifactId>swagger-jaxrs2-servlet-initializer</artifactId>
- <version>${swagger.version}</version>
- </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- <!--REQUIRED TO GENERATE DOCUMENTATION -->
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>${springfox.version}</version>
- </dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>${springfox.version}</version>
- </dependency>
<!-- For development help -->
<dependency>
<groupId>org.springframework.boot</groupId>
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.FilterFactory;
-import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
.map(this::gzip) //
.flatMap(this::sendToClient, 1) //
.onErrorResume(this::handleCollectHistoricalDataError) //
+ .doFinally(sig -> sendLastStoredRecord()) //
.subscribe();
}
}
+ private void sendLastStoredRecord() {
+ String data = "{}";
+ Filter.FilteredData output = new Filter.FilteredData(this.jobGroup.getType().getId(), null, data.getBytes());
+
+ sendToClient(output).subscribe();
+ }
+
private static PmReportFilter getPmReportFilter(JobGroup jobGroup) {
if (jobGroup instanceof JobGroupPm) {
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
-import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
String paramJson = gson.toJson(param);
+ System.out.println(paramJson);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
ConsumerController.TestResults consumer = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
+
+ assertThat(consumer.receivedBodies.get(1)).isEqualTo("{}"); // End marker
}
@Test
@Test
void testPmFilter() throws Exception {
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
- final String TYPE_ID = "PmDataOverRest";
+ final String TYPE_ID = "KafkaInformationType";
- String jsonStr = pmJobParameters();
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), "");
+ ConsumerJobInfo jobInfo = IntegrationWithKafka
+ .consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), TYPE_ID, filterData);
createInformationJobInIcs(DMAAP_JOB_ID, jobInfo);
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
deleteInformationJobInIcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
}
}
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
-import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.TestPropertySource;
class IntegrationWithKafka {
final String KAFKA_TYPE_ID = "KafkaInformationType";
- final String PM_TYPE_ID = "PmDataOverKafka";
+ final static String PM_TYPE_ID = "PmDataOverKafka";
@Autowired
private ApplicationConfig applicationConfig;
}
}
- ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+ public static ConsumerJobInfo consumerJobInfoKafka(String kafkaBootstrapServers, String topic,
+ PmReportFilter.FilterData filterData) {
try {
Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
.topic(topic) //
- .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+ .bootStrapServers(kafkaBootstrapServers) //
.build();
Job.Parameters param = Job.Parameters.builder() //
.filter(filterData) //
filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
- this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
- restClient());
- this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
- restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
+ kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
+ kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
ArrayList<KafkaReceiver> receivers = new ArrayList<>();
for (int i = 0; i < NO_OF_JOBS; ++i) {
final String outputTopic = "manyJobs_" + i;
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
- restClient());
+ this.icsSimulatorController.addJob(
+ consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
+ outputTopic, restClient());
KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
receivers.add(receiver);
}
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
+ this.icsSimulatorController.addJob(
+ consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
+ jobId, restClient());
KafkaReceiver receiver =
new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
filterData.setPmRopEndTime(OffsetDateTime.now().toString());
- this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
- restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
+ kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());