return retrieve(traceTag, request);
}
- public Mono<ResponseEntity<String>> putForEntity(String uri) {
- Object traceTag = createTraceTag();
- logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
- logger.trace("{} PUT body: <empty>", traceTag);
- RequestHeadersSpec<?> request = getWebClient() //
- .put() //
- .uri(uri);
- return retrieve(traceTag, request);
- }
-
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
.map(this::toBody);
@Getter
private final HttpStatus httpStatus;
- public ServiceException(String message) {
- super(message);
- httpStatus = null;
- }
-
- public ServiceException(String message, Exception originalException) {
- super(message, originalException);
- httpStatus = null;
- }
-
public ServiceException(String message, HttpStatus httpStatus) {
super(message);
this.httpStatus = httpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
@Component
public synchronized Job getJob(String id) throws ServiceException {
Job job = allJobs.get(id);
if (job == null) {
- throw new ServiceException("Could not find job: " + id);
+ throw new ServiceException("Could not find job: " + id, HttpStatus.NOT_FOUND);
}
return job;
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
}
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
- public void supervisionTask() {
- checkRegistration() //
+ public void runSupervisionTask() {
+ supervisionTask().subscribe( //
+ null, //
+ this::handleRegistrationFailure, //
+ this::handleRegistrationCompleted);
+ }
+
+ public Mono<String> supervisionTask() {
+ return checkRegistration() //
.filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
- .flatMap(isRegisterred -> registerTypesAndProducer()) //
- .subscribe( //
- null, //
- this::handleRegistrationFailure, //
- this::handleRegistrationCompleted);
+ .flatMap(isRegisterred -> registerTypesAndProducer());
}
private void handleRegistrationCompleted() {
InputStream in = getClass().getResourceAsStream(filePath);
logger.debug("Reading application schema file from: {} with: {}", filePath, in);
if (in == null) {
- throw new ServiceException("Could not readfile: " + filePath);
+ throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR);
}
return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@Autowired
KafkaTopicConsumers kafkaTopicConsumers;
+ @Autowired
+ ProducerRegstrationTask producerRegistrationTask;
+
private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
@LocalServerPort
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
+ producerRegistrationTask.supervisionTask().block();
// Create a job
this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());