import org.oran.pmproducer.clients.AsyncRestClient;
import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.controllers.ProducerCallbacksController;
import org.oran.pmproducer.exceptions.ServiceException;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.r1.ConsumerJobInfo;
import org.oran.pmproducer.r1.ProducerInfoTypeInfo;
import org.oran.pmproducer.r1.ProducerRegistrationInfo;
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void runSupervisionTask() {
- supervisionTask().subscribe( //
+ if (this.isRegisteredInIcs) {
+ return;
+ }
+ registerTypesAndProducer().subscribe( //
null, //
- this::handleRegistrationFailure, //
- this::handleRegistrationCompleted);
- }
-
- public Mono<String> supervisionTask() {
- return checkRegistration() //
- .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
- .flatMap(isRegisterred -> registerTypesAndProducer());
+ this::handleRegistrationFailure//
+ );
}
private void handleRegistrationCompleted() {
return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
}
- // Returns TRUE if registration is correct
- private Mono<Boolean> checkRegistration() {
- return restClient.get(producerRegistrationUrl()) //
- .flatMap(this::isRegisterredInfoCorrect) //
- .onErrorResume(t -> Mono.just(Boolean.FALSE));
- }
-
- private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
- ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
- if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered in ICS");
- return Mono.just(Boolean.TRUE);
- } else {
- return Mono.just(Boolean.FALSE);
- }
- }
-
private String registerTypeUrl(InfoType type) {
return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
- private Mono<String> registerTypesAndProducer() {
+ public Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 1;
return Flux.fromIterable(this.types.getAll()) //
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
- .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
+ .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())))
+ .doOnNext(n -> handleRegistrationCompleted());
}
private Mono<InfoType> createInputDataJob(InfoType type) {
return restClient.put(consumerJobUrl(JOB_ID), body)
.doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
t.getMessage()))
- .onErrorResume(t -> Mono.just("")) //
.doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
.map(x -> type);
}