Merge "Added support for using oauth token for Kafka"
authorPatrik Buhr <patrik.buhr@est.tech>
Thu, 23 Mar 2023 13:00:32 +0000 (13:00 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Thu, 23 Mar 2023 13:00:32 +0000 (13:00 +0000)
1  2 
pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java
pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java

@@@ -32,10 -32,10 +32,10 @@@ import lombok.Getter
  
  import org.oran.pmproducer.clients.AsyncRestClient;
  import org.oran.pmproducer.clients.AsyncRestClientFactory;
- import org.oran.pmproducer.clients.SecurityContext;
  import org.oran.pmproducer.configuration.ApplicationConfig;
  import org.oran.pmproducer.controllers.ProducerCallbacksController;
  import org.oran.pmproducer.exceptions.ServiceException;
+ import org.oran.pmproducer.oauth2.SecurityContext;
  import org.oran.pmproducer.r1.ConsumerJobInfo;
  import org.oran.pmproducer.r1.ProducerInfoTypeInfo;
  import org.oran.pmproducer.r1.ProducerRegistrationInfo;
@@@ -82,13 -82,16 +82,13 @@@ public class ProducerRegstrationTask 
  
      @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
      public void runSupervisionTask() {
 -        supervisionTask().subscribe( //
 +        if (this.isRegisteredInIcs) {
 +            return;
 +        }
 +        registerTypesAndProducer().subscribe( //
                  null, //
 -                this::handleRegistrationFailure, //
 -                this::handleRegistrationCompleted);
 -    }
 -
 -    public Mono<String> supervisionTask() {
 -        return checkRegistration() //
 -                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
 -                .flatMap(isRegisterred -> registerTypesAndProducer());
 +                this::handleRegistrationFailure//
 +        );
      }
  
      private void handleRegistrationCompleted() {
          return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
      }
  
 -    // Returns TRUE if registration is correct
 -    private Mono<Boolean> checkRegistration() {
 -        return restClient.get(producerRegistrationUrl()) //
 -                .flatMap(this::isRegisterredInfoCorrect) //
 -                .onErrorResume(t -> Mono.just(Boolean.FALSE));
 -    }
 -
 -    private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
 -        ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
 -        if (isEqual(producerRegistrationInfo(), registerredInfo)) {
 -            logger.trace("Already registered in ICS");
 -            return Mono.just(Boolean.TRUE);
 -        } else {
 -            return Mono.just(Boolean.FALSE);
 -        }
 -    }
 -
      private String registerTypeUrl(InfoType type) {
          return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
      }
  
 -    private Mono<String> registerTypesAndProducer() {
 +    public Mono<String> registerTypesAndProducer() {
          final int CONCURRENCY = 1;
  
          return Flux.fromIterable(this.types.getAll()) //
                          CONCURRENCY) //
                  .collectList() //
                  .doOnNext(type -> logger.info("Registering producer")) //
 -                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
 +                .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())))
 +                .doOnNext(n -> handleRegistrationCompleted());
      }
  
      private Mono<InfoType> createInputDataJob(InfoType type) {
          return restClient.put(consumerJobUrl(JOB_ID), body)
                  .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
                          t.getMessage()))
 -                .onErrorResume(t -> Mono.just("")) //
                  .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
                  .map(x -> type);
      }
@@@ -53,7 -53,6 +53,6 @@@ import org.junit.jupiter.api.TestMethod
  import org.mockito.ArgumentCaptor;
  import org.oran.pmproducer.clients.AsyncRestClient;
  import org.oran.pmproducer.clients.AsyncRestClientFactory;
- import org.oran.pmproducer.clients.SecurityContext;
  import org.oran.pmproducer.configuration.ApplicationConfig;
  import org.oran.pmproducer.configuration.WebClientConfig;
  import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
@@@ -64,6 -63,7 +63,7 @@@ import org.oran.pmproducer.filter.Filte
  import org.oran.pmproducer.filter.PmReport;
  import org.oran.pmproducer.filter.PmReportFilter;
  import org.oran.pmproducer.filter.PmReportFilter.FilterData;
+ import org.oran.pmproducer.oauth2.SecurityContext;
  import org.oran.pmproducer.r1.ConsumerJobInfo;
  import org.oran.pmproducer.r1.ProducerJobInfo;
  import org.oran.pmproducer.repository.InfoType;
@@@ -235,11 -235,6 +235,6 @@@ class ApplicationTest 
          return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
      }
  
-     private String quote(String str) {
-         final String q = "\"";
-         return q + str.replace(q, "\\\"") + q;
-     }
      private Object toJson(String json) {
          try {
              return JsonParser.parseString(json).getAsJsonObject();
      }
  
      private void waitForRegistration() {
 +        producerRegistrationTask.registerTypesAndProducer().block();
          // Register producer, Register types
          await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
 -        producerRegistrationTask.supervisionTask().block();
 +        producerRegistrationTask.registerTypesAndProducer().block();
  
          assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
          assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
          await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
      }
  
 -    @Test
 -    void testReRegister() throws Exception {
 -        // Wait foir register types and producer
 -        waitForRegistration();
 -
 -        // Clear the registration, should trigger a re-register
 -        icsSimulatorController.testResults.reset();
 -        producerRegistrationTask.supervisionTask().block();
 -        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
 -        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 -
 -        // Just clear the registerred types, should trigger a re-register
 -        icsSimulatorController.testResults.types.clear();
 -        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds)
 -                .hasSize(this.types.size()));
 -    }
 -
      @Test
      @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
      void testZZActuator() throws Exception {