From: PatrikBuhr Date: Mon, 6 Dec 2021 14:20:12 +0000 (+0100) Subject: NONRTRIC - Implement DMaaP mediator producer service in Java X-Git-Tag: 1.2.0~10 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;ds=inline;h=342864a6e1822ac77355e170307969c47555728d;hp=fb581ed20f191d6fb1187dcad04af253ff697e8d;p=nonrtric.git NONRTRIC - Implement DMaaP mediator producer service in Java Improved coverage Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: Ia31faebadbc231589039438a73b6f80673d0331e --- diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index d54ac44c..746fdd75 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -112,16 +112,6 @@ public class AsyncRestClient { return retrieve(traceTag, request); } - public Mono> putForEntity(String uri) { - Object traceTag = createTraceTag(); - logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); - logger.trace("{} PUT body: ", traceTag); - RequestHeadersSpec request = getWebClient() // - .put() // - .uri(uri); - return retrieve(traceTag, request); - } - public Mono put(String uri, String body) { return putForEntity(uri, body) // .map(this::toBody); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java index 740911d4..b30e28e1 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java @@ -31,16 +31,6 @@ public class ServiceException extends Exception { @Getter private final HttpStatus httpStatus; - public ServiceException(String message) { - super(message); - httpStatus = null; - } - - public ServiceException(String message, Exception originalException) { - super(message, originalException); - httpStatus = null; - } - public ServiceException(String message, HttpStatus httpStatus) { super(message); this.httpStatus = httpStatus; diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 0e7743d4..ec33774b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -35,6 +35,7 @@ import org.oran.dmaapadapter.repository.Job.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; @Component @@ -59,7 +60,7 @@ public class Jobs { public synchronized Job getJob(String id) throws ServiceException { Job job = allJobs.get(id); if (job == null) { - throw new ServiceException("Could not find job: " + id); + throw new ServiceException("Could not find job: " + id, HttpStatus.NOT_FOUND); } return job; } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index c9284b5b..ec3f2b29 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -42,6 +42,7 @@ import org.oran.dmaapadapter.repository.InfoTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -77,14 +78,17 @@ public class ProducerRegstrationTask { } @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) - public void supervisionTask() { - checkRegistration() // + public void runSupervisionTask() { + supervisionTask().subscribe( // + null, // + this::handleRegistrationFailure, // + this::handleRegistrationCompleted); + } + + public Mono supervisionTask() { + return checkRegistration() // .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) // - .flatMap(isRegisterred -> registerTypesAndProducer()) // - .subscribe( // - null, // - this::handleRegistrationFailure, // - this::handleRegistrationCompleted); + .flatMap(isRegisterred -> registerTypesAndProducer()); } private void handleRegistrationCompleted() { @@ -153,7 +157,7 @@ public class ProducerRegstrationTask { InputStream in = getClass().getResourceAsStream(filePath); logger.debug("Reading application schema file from: {} with: {}", filePath, in); if (in == null) { - throw new ServiceException("Could not readfile: " + filePath); + throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR); } return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8)); } diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 0ea00564..8c414234 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -52,6 +52,7 @@ import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer; import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; +import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -98,6 +99,9 @@ class ApplicationTest { @Autowired KafkaTopicConsumers kafkaTopicConsumers; + @Autowired + ProducerRegstrationTask producerRegistrationTask; + private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); @LocalServerPort @@ -288,6 +292,8 @@ class ApplicationTest { // Register producer, Register types await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue(); + producerRegistrationTask.supervisionTask().block(); // Create a job this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());