Minor changes 87/9487/2
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 4 Nov 2022 13:10:13 +0000 (14:10 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Sat, 5 Nov 2022 06:40:01 +0000 (07:40 +0100)
Mainly traces.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Id32809bc4f6f7c4ff7d097b165275c8ec6d6bf93

src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java
src/main/java/org/oran/dmaapadapter/filter/PmReport.java
src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java

index f17cd9a..5cf7c86 100644 (file)
@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
 import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncRequestBody;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -164,7 +165,7 @@ class S3ObjectStore implements DataStore {
 
         return Mono.fromFuture(future) //
                 .map(f -> s3Bucket) //
-                .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+                .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
                 .onErrorResume(t -> Mono.just(s3Bucket));
     }
 
@@ -246,10 +247,10 @@ class S3ObjectStore implements DataStore {
                 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
 
         return Mono.fromFuture(future) //
-                .map(b -> b.asByteArray()) //
+                .map(BytesWrapper::asByteArray) //
                 .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket,
                         t.getMessage())) //
-                .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+                .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
                 .onErrorResume(t -> Mono.empty());
     }
 
index 0eefed3..bd8b5be 100644 (file)
@@ -93,6 +93,13 @@ public class PmReport {
 
         @Expose
         String sValue = "";
+
+        public MeasResult copy() {
+            MeasResult c = new MeasResult();
+            c.p = this.p;
+            c.sValue = this.sValue;
+            return c;
+        }
     }
 
     public static class MeasValuesList {
index e602c1c..c8a575b 100644 (file)
@@ -160,7 +160,7 @@ public class PmReportFilter implements Filter {
 
         for (PmReport.MeasResult measResult : oldMeasResults) {
             if (isMeasResultMatch(measResult, measTypes, filter)) {
-                newMeasResults.add(measResult);
+                newMeasResults.add(measResult.copy());
             }
         }
         return newMeasResults;
index ef5ab2a..259edbf 100644 (file)
@@ -31,13 +31,11 @@ import lombok.Getter;
 
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.datastore.DataStore;
-import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.Filter;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
@@ -97,32 +95,30 @@ public abstract class JobDataDistributor {
         this.errorStats.resetIrrecoverableErrors();
     }
 
-    static class LockedException extends ServiceException {
-        public LockedException(String file) {
-            super(file, HttpStatus.NOT_FOUND);
-        }
-    }
-
     public void start(Flux<TopicListener.DataFromTopic> input) {
+        logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(),
+                job.getParameters().getKafkaOutputTopic());
         PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
 
         if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
-            this.subscription = Flux.just(input) //
-                    .flatMap(in -> filterAndBuffer(in, this.job)) //
+            this.subscription = filterAndBuffer(input, this.job) //
                     .flatMap(this::sendToClient) //
                     .onErrorResume(this::handleError) //
                     .subscribe(this::handleSentOk, //
                             this::handleExceptionInStream, //
-                            () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
+                            () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId()));
         }
 
         if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
             this.dataStore.createLock(collectHistoricalDataLockName()) //
-                    .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted)
-                            : Mono.error(new LockedException(collectHistoricalDataLockName()))) //
-                    .doOnNext(n -> logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId())) //
-                    .doOnError(t -> logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
-                            this.job.getId())) //
+                    .doOnNext(isLockGranted -> {
+                        if (isLockGranted.booleanValue()) {
+                            logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId());
+                        } else {
+                            logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
+                                    this.job.getId());
+                        }
+                    }).filter(isLockGranted -> isLockGranted) //
                     .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
                     .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
                             this.job.getId())) //
@@ -160,14 +156,9 @@ public abstract class JobDataDistributor {
     }
 
     private Mono<String> handleCollectHistoricalDataError(Throwable t) {
-        if (t instanceof LockedException) {
-            logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
-            return Mono.empty(); // Ignore
-        } else {
-            logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
-            return tryDeleteLockFile() //
-                    .map(bool -> "OK");
-        }
+        logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
+        return tryDeleteLockFile() //
+                .map(bool -> "OK");
     }
 
     private String collectHistoricalDataLockName() {
@@ -260,11 +251,14 @@ public abstract class JobDataDistributor {
 
     private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
         Flux<Filter.FilteredData> filtered = //
-                inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+                inputFlux //
+                        .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) //
+                        .doOnNext(data -> job.getStatistics().received(data.value)) //
                         .map(job::filter) //
                         .map(this::gzip) //
                         .filter(f -> !f.isEmpty()) //
-                        .doOnNext(f -> job.getStatistics().filtered(f.value)); //
+                        .doOnNext(f -> job.getStatistics().filtered(f.value)) //
+                        .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); //
 
         if (job.isBuffered()) {
             filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
@@ -289,6 +283,7 @@ public abstract class JobDataDistributor {
         logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
         this.errorStats.handleException(t);
         if (this.errorStats.isItHopeless()) {
+            logger.error("Giving up: {} job: {}", t.getMessage(), job.getId());
             return Mono.error(t);
         } else {
             return Mono.empty(); // Ignore
index 54262d7..52bcb44 100644 (file)
@@ -94,9 +94,8 @@ public class KafkaTopicListener implements TopicListener {
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-        consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords());
 
         return ReceiverOptions.<byte[], byte[]>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
@@ -104,7 +103,7 @@ public class KafkaTopicListener implements TopicListener {
 
     public static Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type,
             DataStore fileStore) {
-        if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) {
+        if (type.getDataType() != InfoType.DataType.PM_DATA) {
             return Mono.just(data);
         }
 
@@ -115,7 +114,7 @@ public class KafkaTopicListener implements TopicListener {
                 logger.warn("Ignoring received message: {}", data);
                 return Mono.empty();
             }
-
+            logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic());
             return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
                     .map(bytes -> unzip(bytes, ev.getFilename())) //
                     .map(bytes -> new DataFromTopic(data.key, bytes, false));
index 7b9cbcf..f4a24d2 100644 (file)
@@ -156,7 +156,7 @@ public class ProducerRegstrationTask {
                 .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
                         t.getMessage()))
                 .onErrorResume(t -> Mono.just("")) //
-                .doOnNext(n -> logger.info("Created job: {}, type: {}", JOB_ID, type.getInputJobType())) //
+                .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
                 .map(x -> type);
     }
 
index 8974d5d..ab553b6 100644 (file)
@@ -213,7 +213,7 @@ class IntegrationWithKafka {
         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
             this.count++;
-            logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+            logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
         }
 
         synchronized String lastKey() {
@@ -409,8 +409,8 @@ class IntegrationWithKafka {
             noOfSentBytes += s.getNoOfSentBytes();
             noOfSentObjs += s.getNoOfSentObjects();
         }
-        logger.error("  Stats noOfSentBytes: {}, noOfSentObjects: {}, noOfTopics: {}", noOfSentBytes, noOfSentObjs,
-                stats.jobStatistics.size());
+        logger.error("  Stats noOfSentBytes (total): {}, noOfSentObjects (total): {}, noOfTopics: {}", noOfSentBytes,
+                noOfSentObjs, stats.jobStatistics.size());
     }
 
     private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
@@ -537,7 +537,7 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 50;
+        final int NO_OF_OBJECTS = 10;
 
         Instant startTime = Instant.now();
 
@@ -552,13 +552,15 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
         sendDataToKafka(dataToSend);
 
-        while (kafkaReceiver.count != NO_OF_OBJECTS) {
+        while (kafkaReceiver.count < NO_OF_OBJECTS) {
             logger.info("sleeping {}", kafkaReceiver.count);
             Thread.sleep(1000 * 1);
         }
 
         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
+
+        assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS);
     }
 
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@@ -618,7 +620,7 @@ class IntegrationWithKafka {
 
         for (KafkaReceiver receiver : receivers) {
             if (receiver.count != NO_OF_OBJECTS) {
-                System.out.println("** Unexpected" + receiver.OUTPUT_TOPIC + " " + receiver.count);
+                System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
             }
         }
     }
index 6dd185c..b32d6d1 100644 (file)
@@ -93,9 +93,20 @@ class PmReportFilterTest {
 
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    private static Gson gson = new GsonBuilder() //
+            .disableHtmlEscaping() //
+            .create(); //
+
     private String filterReport(PmReportFilter filter) throws Exception {
+
         TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
         FilteredData filtered = filter.filter(data);
+
+        String reportAfterFilter = gson.toJson(data.getCachedPmReport());
+        String reportBeforeFilter = gson.toJson(gson.fromJson(loadReport(), PmReport.class));
+
+        assertThat(reportAfterFilter).isEqualTo(reportBeforeFilter);
+
         return filtered.getValueAString();
     }
 
@@ -176,10 +187,6 @@ class PmReportFilterTest {
 
     // @Test
     void testSomeCharacteristics() throws Exception {
-        Gson gson = new GsonBuilder() //
-                .disableHtmlEscaping() //
-                .create(); //
-
         String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
 
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());