DMaaP adapter, updated job definition syntax 42/10242/2
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 5 Jan 2023 13:06:38 +0000 (14:06 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 10 Jan 2023 08:49:30 +0000 (09:49 +0100)
The filter syntax is updated so that measTypes are structured below measuredObjClass
The job definition is updated so that the output contains both topic and the kafka boostrap servers.
Graceful shutdown of Dmaap Adapter

Example of PM definition after the change:

{
   "filter":{
      "sourceNames":[
         "NodeA", "NodeB"
      ],
      "measObjInstIds":[

      ],
      "measTypeSpecs":[
         {
            "measuredObjClass":"NRCellCU",
            "measTypes":[
               "pmCounterNumber0"
            ]
         }
      ],
      "measuredEntityDns":[

      ]
   },
   "deliveryInfo":{
      "topic":"ouputTopic",
      "bootStrapServers":"localhost:9092"
   }
}

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-831
Change-Id: Ifbdeb676fce22eaff3c6e35eb50c76a661ea27fa

18 files changed:
api/api.json
api/api.yaml
config/application.yaml
docs/overview.rst
src/main/java/org/oran/dmaapadapter/Application.java
src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/main/resources/typeSchema.json
src/main/resources/typeSchemaPmData.json
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java
src/test/resources/pm_report.json

index 3ec5a4d..6ba7582 100644 (file)
             }},
             "tags": ["Actuator"]
         }},
+        "/actuator/shutdown": {"post": {
+            "summary": "Actuator web endpoint 'shutdown'",
+            "operationId": "shutdown_2",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
         "/data-producer/v1/info-producers/{infoProducerId}": {
             "get": {
                 "operationId": "getInfoProducer",
index b2eaf15..024b43f 100644 (file)
@@ -174,6 +174,19 @@ paths:
             '*/*':
               schema:
                 type: object
+  /actuator/shutdown:
+    post:
+      tags:
+      - Actuator
+      summary: Actuator web endpoint 'shutdown'
+      operationId: shutdown_2
+      responses:
+        200:
+          description: OK
+          content:
+            '*/*':
+              schema:
+                type: object
   /data-producer/v1/info-producers/{infoProducerId}:
     get:
       tags:
index 8975849..57c6e31 100644 (file)
@@ -27,7 +27,12 @@ management:
     web:
       exposure:
         # Enabling of springboot actuator features. See springboot documentation.
-        include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
+        include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown"
+  endpoint:
+    shutdown:
+      enabled: true
+lifecycle:
+  timeout-per-shutdown-phase: "20s"
 springdoc:
   show-actuator: true
 logging:
@@ -48,14 +53,15 @@ logging:
 server:
    # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
    # See springboot documentation.
-   port : 8435
-   http-port: 8084
-   ssl:
-      key-store-type: JKS
-      key-store-password: policy_agent
-      key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
-      key-password: policy_agent
-      key-alias: policy_agent
+  port : 8435
+  http-port: 8084
+  ssl:
+    key-store-type: JKS
+    key-store-password: policy_agent
+    key-store: /opt/app/dmaap-adapter-service/etc/cert/keystore.jks
+    key-password: policy_agent
+    key-alias: policy_agent
+  shutdown: "graceful"
 app:
   webclient:
     # Configuration of the trust store used for the HTTP client (outgoing requests)
index cd80ce0..aebb5cb 100644 (file)
@@ -116,7 +116,10 @@ typeSchema.json
 ===============
 This schema will by default be registerred for the type. The following properties are defined:
 
-* kafkaOutputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka.
+* outputTopic, optional parameter which enables the information job to output data to a Kafka topic instead of a direct call to a data consumer. The output of a job can be directed to HTTP or to Kafka regardless if the input is retrieved from DMaaP or from Kafka. This consists of the following properties:
+
+  * topic, the name of the kafka topic
+  * bootStrapServers, reference to the kafka bus to used. This is optional, if this is omitted the default configured kafka bus is used (which is configured in the application.yaml file).
 
 * filterType, selects the type of filtering that will be done. This can be one of: "regexp", "json-path", "jslt".
 
@@ -186,9 +189,11 @@ The filterType parameter is extended to allow value "pmdata" which can be used f
 * sourceNames an array of source names for wanted PM reports.
 * measObjInstIds an array of meas object instances for wanted PM reports. If a given filter value is contained in the filter definition, it will match (partial matching).
   For instance a value like "NRCellCU" will match "ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32".
-* measTypes selects the meas types to get
+* measTypeSpecs selects the meas types to get. This consist of:
+
+  * measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
+  * measTypes the name of the measurement types (connected to the measObjClass).
 * measuredEntityDns partial match of meas entity DNs.
-* measObjClass matching of the class of the measObjInstId. The measObjInstId must follow the 3GPP naming conventions for Managed Objects (3GPP TS 32.106-8).
   Example, for a distinguished name "ManagedElement=RNC-Gbg-1,ENodeBFunction=1", the MO class will be "ENodeBFunction".
 * pmRopStartTime gives a possibility to query from already collected PM files. The start file is the time from when the information shall be returned.
   In this case, the query is only done for files from the given "sourceNames".
@@ -209,15 +214,17 @@ Below follows an example of a PM filter.
         "measObjInstIds":[
            "UtranCell=dGbg-997"
         ],
-        "measTypes":[
-           "succImmediateAssignProcs"
-        ],
+        "measTypeSpecs":[
+             {
+                "measuredObjClass":"UtranCell",
+                "measTypes":[
+                   "succImmediateAssignProcs"
+                ]
+             }
+          ],
         "measuredEntityDns":[
            "ManagedElement=RNC-Gbg-1"
         ],
-        "measObjClass":[
-           "UtranCell"
-        ]
         "pmRopStartTime" : "2022-12-13T10:50:44.000-08:00"
       }
     }
@@ -234,12 +241,15 @@ match two counters from all cells in two traffical nodes.
         "sourceNames":[
            "O-DU-1122", "O-DU-1123"
         ],
-        "measTypes":[
-           "succImmediateAssignProcs", "attTCHSeizures"
-        ],
-        "measObjClass":[
-           "UtranCell"
-        ]
+        "measTypeSpecs":[
+             {
+                "measuredObjClass":"NRCellCU",
+                "measTypes":[
+                   "pmCounterNumber0", "pmCounterNumber1"
+                ]
+             }
+          ],
+
       }
     }
 
index 2058202..7201d4f 100644 (file)
@@ -48,6 +48,14 @@ public class Application {
 
     public static void main(String[] args) {
         applicationContext = SpringApplication.run(Application.class);
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                logger.warn("Shutting down, received signal SIGTERM");
+                SpringApplication.exit(applicationContext);
+                applicationContext = null;
+            }
+        });
     }
 
     @Scheduled(fixedRate = 10 * 1000)
index a6e2444..3ba96ab 100644 (file)
@@ -118,6 +118,22 @@ public class AsyncRestClient {
                 .map(this::toBody);
     }
 
+    public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+        Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .body(bodyProducer, String.class);
+        return retrieve(request);
+    }
+
+    public Mono<String> post(String uri, @Nullable String body) {
+        return postForEntity(uri, body) //
+                .map(this::toBody);
+    }
+
     private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
         if (securityContext.isConfigured()) {
             request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
index b53383a..d7561da 100644 (file)
@@ -162,7 +162,7 @@ public class ApplicationConfig {
     }
 
     public Collection<InfoType> getTypes() {
-        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+        com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
         try {
             String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset());
             ConfigFile configData = gson.fromJson(configJson, ConfigFile.class);
index 04834c4..db1a018 100644 (file)
@@ -62,11 +62,53 @@ public class PmReportFilter implements Filter {
 
     @Getter
     public static class FilterData {
+
+        public static class MeasTypeSpec {
+            static MeasTypeSpec empty = new MeasTypeSpec();
+
+            static MeasTypeSpec empty() {
+                return empty;
+            }
+
+            @Getter
+            String measuredObjClass;
+
+            @Getter
+            final Set<String> measTypes = new HashSet<>();
+
+            @Override
+            public boolean equals(Object obj) {
+                return measuredObjClass.equals(obj);
+            }
+
+            @Override
+            public int hashCode() {
+                return measuredObjClass.hashCode();
+            }
+        }
+
         final Set<String> sourceNames = new HashSet<>();
         final Set<String> measObjInstIds = new HashSet<>();
-        final Set<String> measTypes = new HashSet<>();
+        final Collection<MeasTypeSpec> measTypeSpecs = new ArrayList<>();
         final Set<String> measuredEntityDns = new HashSet<>();
-        final Set<String> measObjClass = new HashSet<>();
+
+        public void addMeasTypes(String measObjClass, String... measTypes) {
+            MeasTypeSpec spec = this.findMeasTypeSpec(measObjClass);
+            if (spec == null) {
+                spec = new MeasTypeSpec();
+                spec.measuredObjClass = measObjClass;
+                this.measTypeSpecs.add(spec);
+            }
+            for (String measType : measTypes) {
+                spec.measTypes.add(measType);
+            }
+        }
+
+        public void addMeasTypes(String measObjClass, Collection<String> measTypes) {
+            for (String measType : measTypes) {
+                addMeasTypes(measObjClass, measType);
+            }
+        }
 
         @Setter
         String pmRopStartTime;
@@ -77,9 +119,30 @@ public class PmReportFilter implements Filter {
         public void addAll(FilterData other) {
             addAll(other.sourceNames, sourceNames);
             addAll(other.measObjInstIds, measObjInstIds);
-            addAll(other.measTypes, measTypes);
+            addAll(other.measTypeSpecs);
             addAll(other.measuredEntityDns, measuredEntityDns);
-            addAll(other.measObjClass, measObjClass);
+        }
+
+        public MeasTypeSpec getMeasTypeSpec(String measuredObjClass) {
+            if (measTypeSpecs.isEmpty()) {
+                return MeasTypeSpec.empty();
+            }
+            return findMeasTypeSpec(measuredObjClass);
+        }
+
+        private MeasTypeSpec findMeasTypeSpec(String measuredObjClass) {
+            for (MeasTypeSpec t : this.measTypeSpecs) {
+                if (t.measuredObjClass.equals(measuredObjClass)) {
+                    return t;
+                }
+            }
+            return null;
+        }
+
+        private void addAll(Collection<MeasTypeSpec> measTypes) {
+            for (MeasTypeSpec s : measTypes) {
+                addMeasTypes(s.getMeasuredObjClass(), s.getMeasTypes());
+            }
         }
 
         private void addAll(Set<String> source, Set<String> dst) {
@@ -185,17 +248,18 @@ public class PmReportFilter implements Filter {
         return false;
     }
 
-    private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes, FilterData filter) {
+    private boolean isMeasResultMatch(MeasResult measResult, MeasTypes measTypes,
+            FilterData.MeasTypeSpec measTypesSpec) {
         String measType = measTypes.getMeasType(measResult.getP());
-        return filter.measTypes.isEmpty() || filter.measTypes.contains(measType);
+        return measTypesSpec.measTypes.isEmpty() || measTypesSpec.measTypes.contains(measType);
     }
 
     private Collection<MeasResult> createMeasResults(Collection<MeasResult> oldMeasResults, MeasTypes measTypes,
-            FilterData filter) {
+            FilterData.MeasTypeSpec measTypesSpec) {
         Collection<MeasResult> newMeasResults = new ArrayList<>();
 
         for (MeasResult measResult : oldMeasResults) {
-            if (isMeasResultMatch(measResult, measTypes, filter)) {
+            if (isMeasResultMatch(measResult, measTypes, measTypesSpec)) {
                 newMeasResults.add(measResult.toBuilder().build());
             }
         }
@@ -218,27 +282,25 @@ public class PmReportFilter implements Filter {
         return distinguishedName.substring(lastRdn + 1, lastEqualChar);
     }
 
-    private boolean isMeasInstClassMatch(String measObjInstId, FilterData filter) {
-        if (filter.measObjClass.isEmpty()) {
-            return true;
-        }
-
+    private FilterData.MeasTypeSpec getMeasTypeSpec(String measObjInstId, FilterData filter) {
         String measObjClass = managedObjectClass(measObjInstId);
-        return filter.measObjClass.contains(measObjClass);
+        return filter.getMeasTypeSpec(measObjClass);
     }
 
     private MeasValuesList createMeasValuesList(MeasValuesList oldMeasValues, MeasTypes measTypes, FilterData filter) {
+        FilterData.MeasTypeSpec measTypesSpec = getMeasTypeSpec(oldMeasValues.getMeasObjInstId(), filter);
+        if (measTypesSpec == null) {
+            return MeasValuesList.empty();
+        }
 
-        if (isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)
-                && isMeasInstClassMatch(oldMeasValues.getMeasObjInstId(), filter)) {
-
-            Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, filter);
-            return oldMeasValues.toBuilder() //
-                    .measResults(newResults) //
-                    .build();
-        } else {
+        if (!isMeasInstIdMatch(oldMeasValues.getMeasObjInstId(), filter)) {
             return MeasValuesList.empty();
         }
+
+        Collection<MeasResult> newResults = createMeasResults(oldMeasValues.getMeasResults(), measTypes, measTypesSpec);
+        return oldMeasValues.toBuilder() //
+                .measResults(newResults) //
+                .build();
     }
 
     private MeasTypes createMeasTypes(Collection<MeasValuesList> newMeasValues, MeasTypes oldMMeasTypes) {
index 8695524..cda194e 100644 (file)
@@ -28,6 +28,7 @@ import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 
 import lombok.Builder;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
@@ -36,6 +37,7 @@ import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.FilterFactory;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,8 +51,6 @@ public class Job {
     @Schema(name = "job_statistics", description = "Statistics information for one job")
     public static class Statistics {
 
-        // @Schema(name = "jobId", description = "jobId", required = true)
-        // @SerializedName("jobId")
         @JsonProperty(value = "jobId", required = true)
         String jobId;
 
@@ -115,8 +115,18 @@ public class Job {
         @Getter
         private BufferTimeout bufferTimeout;
 
+        @Builder
+        @EqualsAndHashCode
+        public static class KafkaDeliveryInfo {
+            @Getter
+            private String topic;
+
+            @Getter
+            private String bootStrapServers;
+        }
+
         @Getter
-        private String kafkaOutputTopic;
+        private KafkaDeliveryInfo deliveryInfo;
 
         public Filter.Type getFilterType() {
             if (filter == null || filterType == null) {
@@ -197,7 +207,7 @@ public class Job {
                 .groupId(type.getKafkaGroupId()) //
                 .inputTopic(type.getKafkaInputTopic()) //
                 .jobId(id) //
-                .outputTopic(parameters.getKafkaOutputTopic()) //
+                .outputTopic(parameters.getDeliveryInfo() == null ? "" : parameters.getDeliveryInfo().topic) //
                 .typeId(type.getId()) //
                 .clientId(type.getKafkaClientId(appConfig)) //
                 .build();
index f236bbe..9a0559c 100644 (file)
@@ -40,6 +40,7 @@ import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.FilterFactory;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job.Parameters;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
 import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +69,7 @@ public class Jobs {
 
         public Iterable<Job> getJobs();
 
-        public String getTopic();
+        public KafkaDeliveryInfo getDeliveryInfo();
     }
 
     public static class JobGroupSingle implements JobGroup {
@@ -113,14 +114,14 @@ public class Jobs {
         }
 
         @Override
-        public String getTopic() {
-            return this.job.getParameters().getKafkaOutputTopic();
+        public KafkaDeliveryInfo getDeliveryInfo() {
+            return this.job.getParameters().getDeliveryInfo();
         }
     }
 
     public static class JobGroupPm implements JobGroup {
         @Getter
-        private final String topic;
+        private final KafkaDeliveryInfo deliveryInfo;
 
         private Map<String, Job> jobs = new HashMap<>();
 
@@ -130,8 +131,8 @@ public class Jobs {
         @Getter
         private final InfoType type;
 
-        public JobGroupPm(InfoType type, String topic) {
-            this.topic = topic;
+        public JobGroupPm(InfoType type, KafkaDeliveryInfo topic) {
+            this.deliveryInfo = topic;
             this.type = type;
         }
 
@@ -171,7 +172,7 @@ public class Jobs {
 
         @Override
         public String getId() {
-            return topic;
+            return deliveryInfo.getTopic();
         }
 
         @Override
@@ -184,7 +185,7 @@ public class Jobs {
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
-    private Map<String, JobGroup> jobGroups = new HashMap<>();
+    private Map<String, JobGroup> jobGroups = new HashMap<>(); // Key is Topic or JobId
     private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
     private final ApplicationConfig appConfig;
@@ -210,7 +211,7 @@ public class Jobs {
     public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
             Parameters parameters) throws ServiceException {
 
-        if (!Strings.isNullOrEmpty(parameters.getKafkaOutputTopic()) && !Strings.isNullOrEmpty(callbackUrl)) {
+        if ((parameters.getDeliveryInfo() != null) && !Strings.isNullOrEmpty(callbackUrl)) {
             throw new ServiceException("Cannot deliver to both Kafka and HTTP in the same job", HttpStatus.BAD_REQUEST);
         }
         AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
@@ -227,10 +228,10 @@ public class Jobs {
     }
 
     private String jobGroupId(Job job) {
-        if (Strings.isNullOrEmpty(job.getParameters().getKafkaOutputTopic())) {
+        if (job.getParameters().getDeliveryInfo() == null) {
             return job.getId();
         } else if (job.getParameters().getFilterType() == Filter.Type.PM_DATA) {
-            return job.getParameters().getKafkaOutputTopic();
+            return job.getParameters().getDeliveryInfo().getTopic();
         } else {
             return job.getId();
         }
@@ -244,20 +245,20 @@ public class Jobs {
         jobsByType.put(job.getType().getId(), job.getId(), job);
 
         if (job.getParameters().getFilterType() == Filter.Type.PM_DATA
-                && job.getParameters().getKafkaOutputTopic() != null) {
-            String topic = job.getParameters().getKafkaOutputTopic();
-            if (!this.jobGroups.containsKey(topic)) {
-                final JobGroupPm group = new JobGroupPm(job.getType(), topic);
-                this.jobGroups.put(topic, group);
+                && job.getParameters().getDeliveryInfo() != null) {
+            String jobGroupId = jobGroupId(job);
+            if (!this.jobGroups.containsKey(jobGroupId)) {
+                final JobGroupPm group = new JobGroupPm(job.getType(), job.getParameters().getDeliveryInfo());
+                this.jobGroups.put(jobGroupId, group);
                 group.add(job);
                 this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
             } else {
-                JobGroupPm group = (JobGroupPm) this.jobGroups.get(topic);
+                JobGroupPm group = (JobGroupPm) this.jobGroups.get(jobGroupId);
                 group.add(job);
             }
         } else {
             JobGroupSingle group = new JobGroupSingle(job);
-            this.jobGroups.put(job.getId(), group);
+            this.jobGroups.put(jobGroupId(job), group);
             this.observers.forEach(obs -> obs.onJobbGroupAdded(group));
         }
     }
@@ -275,7 +276,7 @@ public class Jobs {
     }
 
     public void remove(Job job) {
-        String groupId = this.jobGroupId(job);
+        String groupId = jobGroupId(job);
         JobGroup group = this.jobGroups.get(groupId);
         synchronized (this) {
             this.allJobs.remove(job.getId());
index c0b1d16..24730a1 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.repository.Job.Parameters.KafkaDeliveryInfo;
 import org.oran.dmaapadapter.repository.Jobs.JobGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,20 +52,20 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
     public KafkaJobDataDistributor(JobGroup jobGroup, ApplicationConfig appConfig) {
         super(jobGroup, appConfig);
 
-        SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
+        SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig, jobGroup.getDeliveryInfo());
         this.sender = KafkaSender.create(senderOptions);
     }
 
     @Override
     protected Mono<String> sendToClient(Filter.FilteredData data) {
 
-        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getTopic());
+        SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, this.getJobGroup().getDeliveryInfo());
 
         logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
-                this.getJobGroup().getTopic());
+                this.getJobGroup().getDeliveryInfo());
 
         return this.sender.send(Mono.just(senderRecord)) //
-                .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getTopic())) //
+                .doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", this.getJobGroup().getDeliveryInfo())) //
                 .doOnError(t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", this.getJobGroup().getId(),
                         t.getMessage())) //
                 .onErrorResume(t -> Mono.empty()) //
@@ -82,8 +83,13 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         }
     }
 
-    private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config) {
-        String bootstrapServers = config.getKafkaBootStrapServers();
+    private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config,
+            KafkaDeliveryInfo deliveryInfo) {
+
+        String bootstrapServers = deliveryInfo.getBootStrapServers();
+        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+            bootstrapServers = config.getKafkaBootStrapServers();
+        }
 
         Map<String, Object> props = new HashMap<>();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -93,9 +99,11 @@ public class KafkaJobDataDistributor extends JobDataDistributor {
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, String topic) {
+    private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output,
+            KafkaDeliveryInfo deliveryInfo) {
         int correlationMetadata = 2;
-        var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
+        var producerRecord =
+                new ProducerRecord<>(deliveryInfo.getTopic(), null, null, output.key, output.value, output.headers());
         return SenderRecord.create(producerRecord, correlationMetadata);
     }
 
index 56180d7..38035c6 100644 (file)
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import lombok.Getter;
 
-import org.apache.logging.log4j.util.Strings;
 import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -94,7 +93,7 @@ public class TopicListeners {
     }
 
     private JobDataDistributor createDistributor(JobGroup jobGroup) {
-        return !Strings.isEmpty(jobGroup.getTopic()) ? new KafkaJobDataDistributor(jobGroup, appConfig)
+        return jobGroup.getDeliveryInfo() != null ? new KafkaJobDataDistributor(jobGroup, appConfig)
                 : new HttpJobDataDistributor(jobGroup, appConfig);
     }
 
index be2829f..c57d066 100644 (file)
       "type": "integer",
       "minimum": 1
     },
-    "kafkaOutputTopic" : {
-      "type": "string"
+    "deliveryInfo": {
+      "type": "object",
+      "properties": {
+        "topic": {
+          "type": "string"
+        },
+        "bootStrapServers": {
+          "type": "string"
+        }
+      },
+      "required": [
+        "topic"
+      ]
     },
     "bufferTimeout": {
       "type": "object",
@@ -41,4 +52,4 @@
     }
   },
   "additionalProperties": false
-}
+}
\ No newline at end of file
index df3f723..2e14ede 100644 (file)
                   }
                ]
             },
-            "measObjClass": {
+            "measTypeSpecs": {
                "type": "array",
                "items": [
                   {
-                     "type": "string"
-                  }
-               ]
-            },
-            "measTypes": {
-               "type": "array",
-               "items": [
-                  {
-                     "type": "string"
+                     "type": "object",
+                     "properties": {
+                        "measuredObjClass": {
+                           "type": "string"
+                        },
+                        "measTypes": {
+                           "type": "array",
+                           "items": [
+                              {
+                                 "type": "string"
+                              }
+                           ]
+                        }
+                     },
+                     "required": [
+                        "measuredObjClass"
+                     ]
                   }
                ]
             },
             "pmdata"
          ]
       },
-      "kafkaOutputTopic": {
-         "type": "string"
+      "deliveryInfo": {
+         "type": "object",
+         "additionalProperties": false,
+         "properties": {
+            "topic": {
+               "type": "string"
+            },
+            "bootStrapServers": {
+               "type": "string"
+            }
+         },
+         "required": [
+            "topic"
+         ]
       }
-   }
+   },
+   "required": [
+      "filter",
+      "filterType"
+   ]
 }
\ No newline at end of file
index 10779d0..da64e0c 100644 (file)
@@ -40,7 +40,9 @@ import java.util.Map;
 import org.json.JSONObject;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.clients.SecurityContext;
@@ -75,11 +77,13 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.TestPropertySource;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+@TestMethodOrder(MethodOrderer.MethodName.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
@@ -115,7 +119,7 @@ class ApplicationTest {
     @Autowired
     private SecurityContext securityContext;
 
-    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
 
     @LocalServerPort
     int localServerHttpPort;
@@ -383,7 +387,7 @@ class ApplicationTest {
         // Create a job with a PM filter
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
 
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
         filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
@@ -559,7 +563,7 @@ class ApplicationTest {
 
         // Create a job with a PM filter
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
+        filterData.addMeasTypes("ManagedElement", "succImmediateAssignProcs");
         Job.Parameters param =
                 Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
 
@@ -601,7 +605,27 @@ class ApplicationTest {
         String stats = restClient().get(targetUri).block();
 
         assertThat(stats).contains(JOB_ID, "DmaapInformationType");
+    }
+
+    @Test
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    void testZZActuator() throws Exception {
+        // The test must be run last, hence the "ZZ" in the name. All succeeding tests
+        // will fail.
+        AsyncRestClient client = restClient();
+        client.post("/actuator/loggers/org.oran.dmaapadapter", "{\"configuredLevel\":\"trace\"}").block();
+        String resp = client.get("/actuator/loggers/org.oran.dmaapadapter").block();
+        assertThat(resp).contains("TRACE");
+        client.post("/actuator/loggers/org.springframework.boot.actuate", "{\"configuredLevel\":\"trace\"}").block();
+        // This will stop the web server and all coming tests will fail.
+        client.post("/actuator/shutdown", "").block();
+        Thread.sleep(1000);
 
+        String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
+        StepVerifier.create(restClient().get(url)) // Any call
+                .expectSubscription() //
+                .expectErrorMatches(t -> t instanceof WebClientRequestException) //
+                .verify();
     }
 
     public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
index 7317919..01e8be6 100644 (file)
@@ -336,8 +336,14 @@ class IntegrationWithKafka {
 
     ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
         try {
-            Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
-                    .kafkaOutputTopic(topic).build();
+            Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+                    .topic(topic) //
+                    .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+                    .build();
+            Job.Parameters param = Job.Parameters.builder() //
+                    .filter(filterData) //
+                    .filterType(Job.Parameters.PM_FILTER_TYPE).deliveryInfo(deliveryInfo) //
+                    .build();
 
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
@@ -348,9 +354,13 @@ class IntegrationWithKafka {
         }
     }
 
-    ConsumerJobInfo consumerJobInfoKafka(String topic) {
+    private ConsumerJobInfo consumerJobInfoKafka(String topic) {
         try {
-            Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
+            Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
+                    .topic(topic) //
+                    .bootStrapServers(this.applicationConfig.getKafkaBootStrapServers()) //
+                    .build();
+            Job.Parameters param = Job.Parameters.builder().deliveryInfo(deliveryInfo).build();
             String str = gson.toJson(param);
             Object parametersObj = jsonObject(str);
 
@@ -579,8 +589,8 @@ class IntegrationWithKafka {
         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("pmCounterNumber0");
-        filterData.getMeasObjClass().add("NRCellCU");
+
+        filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
 
         this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
                 restClient());
@@ -630,9 +640,8 @@ class IntegrationWithKafka {
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
         final int NO_OF_COUNTERS = 2;
         for (int i = 0; i < NO_OF_COUNTERS; ++i) {
-            filterData.getMeasTypes().add("pmCounterNumber" + i);
+            filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
         }
-        filterData.getMeasObjClass().add("NRCellCU");
 
         final int NO_OF_JOBS = 150;
 
@@ -699,8 +708,7 @@ class IntegrationWithKafka {
             final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
             final String jobId = "manyJobs_" + i;
             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-            filterData.getMeasTypes().add("pmCounterNumber" + i); // all counters will be added
-            filterData.getMeasObjClass().add("NRCellCU");
+            filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
 
             this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), jobId, restClient());
 
index bb6f145..5f3f69c 100644 (file)
@@ -115,7 +115,7 @@ class PmReportFilterTest {
     void testPmFilterMeasTypes() throws Exception {
 
         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.measTypes.add("succImmediateAssignProcs");
+        filterData.addMeasTypes("UtranCell", "succImmediateAssignProcs");
 
         PmReportFilter filter = new PmReportFilter(filterData);
         String filtered = filterReport(filter);
@@ -125,7 +125,8 @@ class PmReportFilterTest {
 
         // Test that no report is returned if not meas types were found
         filterData = new PmReportFilter.FilterData();
-        filterData.measTypes.add("junk");
+        filterData.addMeasTypes("junk", "succImmediateAssignProcs");
+
         filter = new PmReportFilter(filterData);
         filtered = filterReport(filter);
         assertThat(filtered).isEmpty();
@@ -150,7 +151,7 @@ class PmReportFilterTest {
     void testMeasObjClass() throws Exception {
         {
             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-            filterData.measObjClass.add("junk");
+            filterData.addMeasTypes("junk");
             PmReportFilter filter = new PmReportFilter(filterData);
             String filtered = filterReport(filter);
             assertThat(filtered).isEmpty();
@@ -161,12 +162,12 @@ class PmReportFilterTest {
                     new TopicListener.DataFromTopic("typeId", null, null, loadReport().getBytes());
 
             PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
-            utranCellFilter.measObjClass.add("UtranCell");
+            utranCellFilter.addMeasTypes("UtranCell");
             FilteredData filtered = new PmReportFilter(utranCellFilter).filter(data);
             assertThat(filtered.getValueAString()).contains("UtranCell").doesNotContain("ENodeBFunction");
 
             PmReportFilter.FilterData eNodeBFilter = new PmReportFilter.FilterData();
-            eNodeBFilter.measObjClass.add("ENodeBFunction");
+            eNodeBFilter.addMeasTypes("ENodeBFunction");
             filtered = new PmReportFilter(eNodeBFilter).filter(data);
             assertThat(filtered.getValueAString()).contains("ENodeBFunction").doesNotContain("UtranCell");
         }
@@ -212,8 +213,7 @@ class PmReportFilterTest {
         {
 
             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-            filterData.getMeasTypes().add("pmCounterNumber0");
-            filterData.getMeasObjClass().add("NRCellCU");
+            filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
             PmReportFilter filter = new PmReportFilter(filterData);
             DataFromTopic topicData = new DataFromTopic("typeId", null, null, pmReportJson.getBytes());
 
index 1aa97c1..402ffea 100644 (file)
@@ -2,16 +2,11 @@
    "event": {
       "commonEventHeader": {
          "domain": "perf3gpp",
-         "eventId": "9efa1210-f285-455f-9c6a-3a659b1f1882",
-         "sequence": 0,
          "eventName": "perf3gpp_gnb-Ericsson_pmMeasResult",
          "sourceName": "O-DU-1122",
          "reportingEntityName": "",
-         "priority": "Normal",
          "startEpochMicrosec": 951912000000,
          "lastEpochMicrosec": 951912900000,
-         "version": "4.0",
-         "vesEventListenerVersion": "7.1",
          "timeZoneOffset": "+00:00"
       },
       "perf3gppFields": {