From 894ef7c6ead6a3617a1190d7c0b36c0d1c21a0be Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 24 Apr 2020 15:45:58 +0200 Subject: [PATCH] Changed the config loading from consul Cannot use the updates() function in CBS client. Forgetting the negitiated protocol when a controller is connected or disconnected in the configuration. Temporary disabled trhe concurrency test due to too much resource consumption. Change-Id: I760add1c1e1b028763ae5c7c8cc4e542361026ef Issue-ID: NONRTRIC-204 Signed-off-by: PatrikBuhr --- .../policyagent/clients/A1ClientFactory.java | 9 ++++- .../policyagent/tasks/RefreshConfigTask.java | 39 ++++++++++++---------- .../org/oransc/policyagent/ApplicationTest.java | 8 +++-- .../policyagent/ConcurrencyTestRunnable.java | 2 +- .../policyagent/tasks/RefreshConfigTaskTest.java | 6 ++-- 5 files changed, 39 insertions(+), 25 deletions(-) diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java index 546979ce..57ac9809 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java @@ -86,13 +86,20 @@ public class A1ClientFactory { private ControllerConfig getControllerConfig(Ric ric) throws ServiceException { String controllerName = ric.getConfig().controllerName(); if (controllerName.isEmpty()) { + ric.setProtocolVersion(A1ProtocolType.UNKNOWN); throw new ServiceException("No controller configured for RIC: " + ric.name()); } - return this.appConfig.getControllerConfig(controllerName); + try { + return this.appConfig.getControllerConfig(controllerName); + } catch (ServiceException e) { + ric.setProtocolVersion(A1ProtocolType.UNKNOWN); + throw e; + } } private void assertNoControllerConfig(Ric ric, A1ProtocolType version) throws ServiceException { if (!ric.getConfig().controllerName().isEmpty()) { + ric.setProtocolVersion(A1ProtocolType.UNKNOWN); throw new ServiceException( "Controller config should be empty, ric: " + ric.name() + " when using protocol version: " + version); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java index 41f2064a..dd235db5 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java @@ -124,46 +124,49 @@ public class RefreshConfigTask { .filter(notUsed -> configFileExists()) // .filter(notUsed -> !this.isConsulUsed) // .flatMap(notUsed -> loadConfigurationFromFile()) // - .onErrorResume(this::ignoreError) // + .onErrorResume(this::ignoreErrorFlux) // .doOnNext(json -> logger.debug("loadFromFile succeeded")) // - .doOnTerminate(() -> logger.info("loadFromFile Terminate")); + .doOnTerminate(() -> logger.error("loadFromFile Terminate")); - Flux loadFromConsul = getEnvironment(systemEnvironment) // + Flux loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) // + .flatMap(i -> getEnvironment(systemEnvironment)) // .flatMap(this::createCbsClient) // - .flatMapMany(this::periodicConfigurationUpdates) // - .onErrorResume(this::ignoreError) // + .flatMap(this::getFromCbs) // .doOnNext(json -> logger.debug("loadFromConsul succeeded")) // .doOnNext(json -> this.isConsulUsed = true) // - .doOnTerminate(() -> logger.info("loadFromConsul Terminated")); + .doOnTerminate(() -> logger.error("loadFromConsul Terminated")); return Flux.merge(loadFromFile, loadFromConsul) // .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig) // .doOnNext(this::handleUpdatedRicConfig) // .flatMap(configUpdate -> Flux.just(configUpdate.getType())) // - .doOnTerminate(() -> handleTerminate("Configuration refresh task is terminated")); - } - - private void handleTerminate(String info) { - logger.error(info); + .doOnTerminate(() -> logger.error("Configuration refresh task is terminated")); } Mono getEnvironment(Properties systemEnvironment) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) // + .onErrorResume(t -> Mono.empty()); } Mono createCbsClient(EnvProperties env) { - return CbsClientFactory.createCbsClient(env); + return CbsClientFactory.createCbsClient(env) // + .onErrorResume(this::ignoreErrorMono); } - private Flux periodicConfigurationUpdates(CbsClient cbsClient) { - final Duration initialDelay = Duration.ZERO; + private Mono getFromCbs(CbsClient cbsClient) { final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); - return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL) // - .onErrorResume(this::ignoreError); + return cbsClient.get(getConfigRequest) // + .onErrorResume(this::ignoreErrorMono); + } + + private Flux ignoreErrorFlux(Throwable throwable) { + String errMsg = throwable.toString(); + logger.warn("Could not refresh application configuration. {}", errMsg); + return Flux.empty(); } - private Mono ignoreError(Throwable throwable) { + private Mono ignoreErrorMono(Throwable throwable) { String errMsg = throwable.toString(); logger.warn("Could not refresh application configuration. {}", errMsg); return Mono.empty(); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 0027cca1..43cb96d5 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -301,6 +301,10 @@ public class ApplicationTest { String rsp = restClient().get(url).block(); assertThat(rsp.contains(policyInstanceId)).isTrue(); + url = "/policy?id=" + policyInstanceId; + rsp = restClient().get(url).block(); + assertThat(rsp).isEqualTo(policyBody); + // Test of error codes url = putPolicyUrl(serviceName, ricName + "XX", policyTypeName, policyInstanceId); testErrorCode(restClient().put(url, policyBody), HttpStatus.NOT_FOUND); @@ -663,10 +667,10 @@ public class ApplicationTest { } private String jsonString() { - return "{\n \"servingCellNrcgi\": \"1\"\n }"; + return "{\"servingCellNrcgi\":\"1\"}"; } - @Test + // @Test TODO temporary disabled public void testConcurrency() throws Exception { final Instant startTime = Instant.now(); List threads = new ArrayList<>(); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java index f3aaa24d..7f80a8e8 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java @@ -80,7 +80,7 @@ class ConcurrencyTestRunnable implements Runnable { deletePolicy(name + "-"); } } catch (Exception e) { - logger.error("Concurrency exception " + e.toString()); + logger.error("Concurrency test exception " + e.toString()); } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java index e24867b7..00d2c993 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java @@ -221,7 +221,7 @@ public class RefreshConfigTaskTest { doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any()); doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props); - when(cbsClient.updates(any(), any(), any())).thenReturn(Flux.error(new IOException())); + when(cbsClient.get(any())).thenReturn(Mono.error(new IOException())); final ListAppender logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class, WARN); Flux task = refreshTaskUnderTest.createRefreshTask(); @@ -229,7 +229,7 @@ public class RefreshConfigTaskTest { StepVerifier // .create(task) // .expectSubscription() // - .expectNoEvent(Duration.ofMillis(100)) // + .expectNoEvent(Duration.ofMillis(1000)) // .thenCancel() // .verify(); @@ -262,7 +262,7 @@ public class RefreshConfigTaskTest { JsonObject configAsJson = getJsonRootObject(); String newBaseUrl = "newBaseUrl"; modifyTheRicConfiguration(configAsJson, newBaseUrl); - when(cbsClient.updates(any(), any(), any())).thenReturn(Flux.just(configAsJson)); + when(cbsClient.get(any())).thenReturn(Mono.just(configAsJson)); doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class)); Flux task = refreshTaskUnderTest.createRefreshTask(); -- 2.16.6