Mainly traces.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Id32809bc4f6f7c4ff7d097b165275c8ec6d6bf93
import reactor.core.publisher.Mono;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
return Mono.fromFuture(future) //
.map(f -> s3Bucket) //
- .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+ .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
.onErrorResume(t -> Mono.just(s3Bucket));
}
s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
return Mono.fromFuture(future) //
- .map(b -> b.asByteArray()) //
+ .map(BytesWrapper::asByteArray) //
.doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket,
t.getMessage())) //
- .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+ .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
.onErrorResume(t -> Mono.empty());
}
@Expose
String sValue = "";
+
+ public MeasResult copy() {
+ MeasResult c = new MeasResult();
+ c.p = this.p;
+ c.sValue = this.sValue;
+ return c;
+ }
}
public static class MeasValuesList {
for (PmReport.MeasResult measResult : oldMeasResults) {
if (isMeasResultMatch(measResult, measTypes, filter)) {
- newMeasResults.add(measResult);
+ newMeasResults.add(measResult.copy());
}
}
return newMeasResults;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.datastore.DataStore;
-import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
this.errorStats.resetIrrecoverableErrors();
}
- static class LockedException extends ServiceException {
- public LockedException(String file) {
- super(file, HttpStatus.NOT_FOUND);
- }
- }
-
public void start(Flux<TopicListener.DataFromTopic> input) {
+ logger.debug("Starting distribution, job: {}, to topic: {}", this.job.getId(),
+ job.getParameters().getKafkaOutputTopic());
PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
- this.subscription = Flux.just(input) //
- .flatMap(in -> filterAndBuffer(in, this.job)) //
+ this.subscription = filterAndBuffer(input, this.job) //
.flatMap(this::sendToClient) //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
- () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
+ () -> logger.warn("JobDataDistributor stopped jobId: {}", job.getId()));
}
if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
this.dataStore.createLock(collectHistoricalDataLockName()) //
- .flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted)
- : Mono.error(new LockedException(collectHistoricalDataLockName()))) //
- .doOnNext(n -> logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId())) //
- .doOnError(t -> logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
- this.job.getId())) //
+ .doOnNext(isLockGranted -> {
+ if (isLockGranted.booleanValue()) {
+ logger.debug("Checking historical PM ROP files, jobId: {}", this.job.getId());
+ } else {
+ logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",
+ this.job.getId());
+ }
+ }).filter(isLockGranted -> isLockGranted) //
.flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //
.doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
this.job.getId())) //
}
private Mono<String> handleCollectHistoricalDataError(Throwable t) {
- if (t instanceof LockedException) {
- logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
- return Mono.empty(); // Ignore
- } else {
- logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
- return tryDeleteLockFile() //
- .map(bool -> "OK");
- }
+ logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
+ return tryDeleteLockFile() //
+ .map(bool -> "OK");
}
private String collectHistoricalDataLockName() {
private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
Flux<Filter.FilteredData> filtered = //
- inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+ inputFlux //
+ .doOnNext(data -> logger.trace("Received data, job {}", job.getId())) //
+ .doOnNext(data -> job.getStatistics().received(data.value)) //
.map(job::filter) //
.map(this::gzip) //
.filter(f -> !f.isEmpty()) //
- .doOnNext(f -> job.getStatistics().filtered(f.value)); //
+ .doOnNext(f -> job.getStatistics().filtered(f.value)) //
+ .doOnNext(data -> logger.trace("Filtered data, job {}", job.getId())); //
if (job.isBuffered()) {
filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
this.errorStats.handleException(t);
if (this.errorStats.isItHopeless()) {
+ logger.error("Giving up: {} job: {}", t.getMessage(), job.getId());
return Mono.error(t);
} else {
return Mono.empty(); // Ignore
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
- consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords());
return ReceiverOptions.<byte[], byte[]>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
public static Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type,
DataStore fileStore) {
- if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) {
+ if (type.getDataType() != InfoType.DataType.PM_DATA) {
return Mono.just(data);
}
logger.warn("Ignoring received message: {}", data);
return Mono.empty();
}
-
+ logger.trace("Reading PM measurements, type: {}, inputTopic: {}", type.getId(), type.getKafkaInputTopic());
return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
.map(bytes -> unzip(bytes, ev.getFilename())) //
.map(bytes -> new DataFromTopic(data.key, bytes, false));
.doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
t.getMessage()))
.onErrorResume(t -> Mono.just("")) //
- .doOnNext(n -> logger.info("Created job: {}, type: {}", JOB_ID, type.getInputJobType())) //
+ .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
.map(x -> type);
}
private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
- logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+ logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
}
synchronized String lastKey() {
noOfSentBytes += s.getNoOfSentBytes();
noOfSentObjs += s.getNoOfSentObjects();
}
- logger.error(" Stats noOfSentBytes: {}, noOfSentObjects: {}, noOfTopics: {}", noOfSentBytes, noOfSentObjs,
- stats.jobStatistics.size());
+ logger.error(" Stats noOfSentBytes (total): {}, noOfSentObjects (total): {}, noOfTopics: {}", noOfSentBytes,
+ noOfSentObjs, stats.jobStatistics.size());
}
private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 50;
+ final int NO_OF_OBJECTS = 10;
Instant startTime = Instant.now();
var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
sendDataToKafka(dataToSend);
- while (kafkaReceiver.count != NO_OF_OBJECTS) {
+ while (kafkaReceiver.count < NO_OF_OBJECTS) {
logger.info("sleeping {}", kafkaReceiver.count);
Thread.sleep(1000 * 1);
}
printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
+
+ assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS);
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
for (KafkaReceiver receiver : receivers) {
if (receiver.count != NO_OF_OBJECTS) {
- System.out.println("** Unexpected" + receiver.OUTPUT_TOPIC + " " + receiver.count);
+ System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
}
}
}
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static Gson gson = new GsonBuilder() //
+ .disableHtmlEscaping() //
+ .create(); //
+
private String filterReport(PmReportFilter filter) throws Exception {
+
TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
FilteredData filtered = filter.filter(data);
+
+ String reportAfterFilter = gson.toJson(data.getCachedPmReport());
+ String reportBeforeFilter = gson.toJson(gson.fromJson(loadReport(), PmReport.class));
+
+ assertThat(reportAfterFilter).isEqualTo(reportBeforeFilter);
+
return filtered.getValueAString();
}
// @Test
void testSomeCharacteristics() throws Exception {
- Gson gson = new GsonBuilder() //
- .disableHtmlEscaping() //
- .create(); //
-
String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());