From: PatrikBuhr Date: Fri, 4 Nov 2022 13:10:13 +0000 (+0100) Subject: Minor changes X-Git-Tag: 1.2.0~3 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F87%2F9487%2F2;p=nonrtric%2Fplt%2Fdmaapadapter.git Minor changes Mainly traces. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Id32809bc4f6f7c4ff7d097b165275c8ec6d6bf93 --- diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java index f17cd9a..5cf7c86 100644 --- a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java +++ b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.BytesWrapper; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; @@ -164,7 +165,7 @@ class S3ObjectStore implements DataStore { return Mono.fromFuture(future) // .map(f -> s3Bucket) // - .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) + .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage())) .onErrorResume(t -> Mono.just(s3Bucket)); } @@ -246,10 +247,10 @@ class S3ObjectStore implements DataStore { s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); return Mono.fromFuture(future) // - .map(b -> b.asByteArray()) // + .map(BytesWrapper::asByteArray) // .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) // - .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // + .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // .onErrorResume(t -> Mono.empty()); } diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java index 0eefed3..bd8b5be 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReport.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReport.java @@ -93,6 +93,13 @@ public class PmReport { @Expose String sValue = ""; + + public MeasResult copy() { + MeasResult c = new MeasResult(); + c.p = this.p; + c.sValue = this.sValue; + return c; + } } public static class MeasValuesList { diff --git a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java index e602c1c..c8a575b 100644 --- a/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/filter/PmReportFilter.java @@ -160,7 +160,7 @@ public class PmReportFilter implements Filter { for (PmReport.MeasResult measResult : oldMeasResults) { if (isMeasResultMatch(measResult, measTypes, filter)) { - newMeasResults.add(measResult); + newMeasResults.add(measResult.copy()); } } return newMeasResults; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index ef5ab2a..259edbf 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -31,13 +31,11 @@ import lombok.Getter; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.datastore.DataStore; -import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.Filter; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; @@ -97,32 +95,30 @@ public abstract class JobDataDistributor { this.errorStats.resetIrrecoverableErrors(); } - static class LockedException extends ServiceException { - public LockedException(String file) { - super(file, HttpStatus.NOT_FOUND); - } - } - public void start(Flux input) { + logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(), + job.getParameters().getKafkaOutputTopic()); PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; if (filter == null || filter.getFilterData().getPmRopEndTime() == null) { - this.subscription = Flux.just(input) // - .flatMap(in -> filterAndBuffer(in, this.job)) // + this.subscription = filterAndBuffer(input, this.job) // .flatMap(this::sendToClient) // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // this::handleExceptionInStream, // - () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId())); + () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId())); } if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { this.dataStore.createLock(collectHistoricalDataLockName()) // - .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted) - : Mono.error(new LockedException(collectHistoricalDataLockName()))) // - .doOnNext(n -> logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId())) // - .doOnError(t -> logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}", - this.job.getId())) // + .doOnNext(isLockGranted -> { + if (isLockGranted.booleanValue()) { + logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId()); + } else { + logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}", + this.job.getId()); + } + }).filter(isLockGranted -> isLockGranted) // .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) // .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName, this.job.getId())) // @@ -160,14 +156,9 @@ public abstract class JobDataDistributor { } private Mono handleCollectHistoricalDataError(Throwable t) { - if (t instanceof LockedException) { - logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId()); - return Mono.empty(); // Ignore - } else { - logger.error("Exception: {} job: {}", t.getMessage(), job.getId()); - return tryDeleteLockFile() // - .map(bool -> "OK"); - } + logger.error("Exception: {} job: {}", t.getMessage(), job.getId()); + return tryDeleteLockFile() // + .map(bool -> "OK"); } private String collectHistoricalDataLockName() { @@ -260,11 +251,14 @@ public abstract class JobDataDistributor { private Flux filterAndBuffer(Flux inputFlux, Job job) { Flux filtered = // - inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) // + inputFlux // + .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) // + .doOnNext(data -> job.getStatistics().received(data.value)) // .map(job::filter) // .map(this::gzip) // .filter(f -> !f.isEmpty()) // - .doOnNext(f -> job.getStatistics().filtered(f.value)); // + .doOnNext(f -> job.getStatistics().filtered(f.value)) // + .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); // if (job.isBuffered()) { filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) // @@ -289,6 +283,7 @@ public abstract class JobDataDistributor { logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); this.errorStats.handleException(t); if (this.errorStats.isItHopeless()) { + logger.error("Giving up: {} job: {}", t.getMessage(), job.getId()); return Mono.error(t); } else { return Mono.empty(); // Ignore diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 54262d7..52bcb44 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -94,9 +94,8 @@ public class KafkaTopicListener implements TopicListener { consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords()); return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); @@ -104,7 +103,7 @@ public class KafkaTopicListener implements TopicListener { public static Mono getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type, DataStore fileStore) { - if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) { + if (type.getDataType() != InfoType.DataType.PM_DATA) { return Mono.just(data); } @@ -115,7 +114,7 @@ public class KafkaTopicListener implements TopicListener { logger.warn("Ignoring received message: {}", data); return Mono.empty(); } - + logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic()); return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) // .map(bytes -> unzip(bytes, ev.getFilename())) // .map(bytes -> new DataFromTopic(data.key, bytes, false)); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 7b9cbcf..f4a24d2 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -156,7 +156,7 @@ public class ProducerRegstrationTask { .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(), t.getMessage())) .onErrorResume(t -> Mono.just("")) // - .doOnNext(n -> logger.info("Created job: {}, type: {}", JOB_ID, type.getInputJobType())) // + .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) // .map(x -> type); } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 8974d5d..ab553b6 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -213,7 +213,7 @@ class IntegrationWithKafka { private void set(TopicListener.DataFromTopic receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; this.count++; - logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput); + logger.debug("*** received data on topic: {}", OUTPUT_TOPIC); } synchronized String lastKey() { @@ -409,8 +409,8 @@ class IntegrationWithKafka { noOfSentBytes += s.getNoOfSentBytes(); noOfSentObjs += s.getNoOfSentObjects(); } - logger.error(" Stats noOfSentBytes: {}, noOfSentObjects: {}, noOfTopics: {}", noOfSentBytes, noOfSentObjs, - stats.jobStatistics.size()); + logger.error(" Stats noOfSentBytes (total): {}, noOfSentObjects (total): {}, noOfTopics: {}", noOfSentBytes, + noOfSentObjs, stats.jobStatistics.size()); } private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) { @@ -537,7 +537,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); - final int NO_OF_OBJECTS = 50; + final int NO_OF_OBJECTS = 10; Instant startTime = Instant.now(); @@ -552,13 +552,15 @@ class IntegrationWithKafka { var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID)); sendDataToKafka(dataToSend); - while (kafkaReceiver.count != NO_OF_OBJECTS) { + while (kafkaReceiver.count < NO_OF_OBJECTS) { logger.info("sleeping {}", kafkaReceiver.count); Thread.sleep(1000 * 1); } printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS); logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); + + assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS); } @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @@ -618,7 +620,7 @@ class IntegrationWithKafka { for (KafkaReceiver receiver : receivers) { if (receiver.count != NO_OF_OBJECTS) { - System.out.println("** Unexpected" + receiver.OUTPUT_TOPIC + " " + receiver.count); + System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count); } } } diff --git a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java index 6dd185c..b32d6d1 100644 --- a/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java +++ b/src/test/java/org/oran/dmaapadapter/filter/PmReportFilterTest.java @@ -93,9 +93,20 @@ class PmReportFilterTest { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static Gson gson = new GsonBuilder() // + .disableHtmlEscaping() // + .create(); // + private String filterReport(PmReportFilter filter) throws Exception { + TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false); FilteredData filtered = filter.filter(data); + + String reportAfterFilter = gson.toJson(data.getCachedPmReport()); + String reportBeforeFilter = gson.toJson(gson.fromJson(loadReport(), PmReport.class)); + + assertThat(reportAfterFilter).isEqualTo(reportBeforeFilter); + return filtered.getValueAString(); } @@ -176,10 +187,6 @@ class PmReportFilterTest { // @Test void testSomeCharacteristics() throws Exception { - Gson gson = new GsonBuilder() // - .disableHtmlEscaping() // - .create(); // - String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());