summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
81bdaff)
Cannot use the updates() function in CBS client.
Forgetting the negitiated protocol when a controller is connected
or disconnected in the configuration.
Temporary disabled trhe concurrency test due
to too much resource consumption.
Change-Id: I760add1c1e1b028763ae5c7c8cc4e542361026ef
Issue-ID: NONRTRIC-204
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
private ControllerConfig getControllerConfig(Ric ric) throws ServiceException {
String controllerName = ric.getConfig().controllerName();
if (controllerName.isEmpty()) {
private ControllerConfig getControllerConfig(Ric ric) throws ServiceException {
String controllerName = ric.getConfig().controllerName();
if (controllerName.isEmpty()) {
+ ric.setProtocolVersion(A1ProtocolType.UNKNOWN);
throw new ServiceException("No controller configured for RIC: " + ric.name());
}
throw new ServiceException("No controller configured for RIC: " + ric.name());
}
- return this.appConfig.getControllerConfig(controllerName);
+ try {
+ return this.appConfig.getControllerConfig(controllerName);
+ } catch (ServiceException e) {
+ ric.setProtocolVersion(A1ProtocolType.UNKNOWN);
+ throw e;
+ }
}
private void assertNoControllerConfig(Ric ric, A1ProtocolType version) throws ServiceException {
if (!ric.getConfig().controllerName().isEmpty()) {
}
private void assertNoControllerConfig(Ric ric, A1ProtocolType version) throws ServiceException {
if (!ric.getConfig().controllerName().isEmpty()) {
+ ric.setProtocolVersion(A1ProtocolType.UNKNOWN);
throw new ServiceException(
"Controller config should be empty, ric: " + ric.name() + " when using protocol version: " + version);
}
throw new ServiceException(
"Controller config should be empty, ric: " + ric.name() + " when using protocol version: " + version);
}
.filter(notUsed -> configFileExists()) //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
.filter(notUsed -> configFileExists()) //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
- .onErrorResume(this::ignoreError) //
+ .onErrorResume(this::ignoreErrorFlux) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
- .doOnTerminate(() -> logger.info("loadFromFile Terminate"));
+ .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
- Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
+ Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) //
+ .flatMap(i -> getEnvironment(systemEnvironment)) //
.flatMap(this::createCbsClient) //
.flatMap(this::createCbsClient) //
- .flatMapMany(this::periodicConfigurationUpdates) //
- .onErrorResume(this::ignoreError) //
+ .flatMap(this::getFromCbs) //
.doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
.doOnNext(json -> this.isConsulUsed = true) //
.doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
.doOnNext(json -> this.isConsulUsed = true) //
- .doOnTerminate(() -> logger.info("loadFromConsul Terminated"));
+ .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig) //
.doOnNext(this::handleUpdatedRicConfig) //
.flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig) //
.doOnNext(this::handleUpdatedRicConfig) //
.flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
- .doOnTerminate(() -> handleTerminate("Configuration refresh task is terminated"));
- }
-
- private void handleTerminate(String info) {
- logger.error(info);
+ .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
}
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
+ .onErrorResume(t -> Mono.empty());
}
Mono<CbsClient> createCbsClient(EnvProperties env) {
}
Mono<CbsClient> createCbsClient(EnvProperties env) {
- return CbsClientFactory.createCbsClient(env);
+ return CbsClientFactory.createCbsClient(env) //
+ .onErrorResume(this::ignoreErrorMono);
- private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
- final Duration initialDelay = Duration.ZERO;
+ private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
- return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL) //
- .onErrorResume(this::ignoreError);
+ return cbsClient.get(getConfigRequest) //
+ .onErrorResume(this::ignoreErrorMono);
+ }
+
+ private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
+ String errMsg = throwable.toString();
+ logger.warn("Could not refresh application configuration. {}", errMsg);
+ return Flux.empty();
- private <R> Mono<R> ignoreError(Throwable throwable) {
+ private <R> Mono<R> ignoreErrorMono(Throwable throwable) {
String errMsg = throwable.toString();
logger.warn("Could not refresh application configuration. {}", errMsg);
return Mono.empty();
String errMsg = throwable.toString();
logger.warn("Could not refresh application configuration. {}", errMsg);
return Mono.empty();
String rsp = restClient().get(url).block();
assertThat(rsp.contains(policyInstanceId)).isTrue();
String rsp = restClient().get(url).block();
assertThat(rsp.contains(policyInstanceId)).isTrue();
+ url = "/policy?id=" + policyInstanceId;
+ rsp = restClient().get(url).block();
+ assertThat(rsp).isEqualTo(policyBody);
+
// Test of error codes
url = putPolicyUrl(serviceName, ricName + "XX", policyTypeName, policyInstanceId);
testErrorCode(restClient().put(url, policyBody), HttpStatus.NOT_FOUND);
// Test of error codes
url = putPolicyUrl(serviceName, ricName + "XX", policyTypeName, policyInstanceId);
testErrorCode(restClient().put(url, policyBody), HttpStatus.NOT_FOUND);
}
private String jsonString() {
}
private String jsonString() {
- return "{\n \"servingCellNrcgi\": \"1\"\n }";
+ return "{\"servingCellNrcgi\":\"1\"}";
+ // @Test TODO temporary disabled
public void testConcurrency() throws Exception {
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
public void testConcurrency() throws Exception {
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
deletePolicy(name + "-");
}
} catch (Exception e) {
deletePolicy(name + "-");
}
} catch (Exception e) {
- logger.error("Concurrency exception " + e.toString());
+ logger.error("Concurrency test exception " + e.toString());
doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any());
doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props);
doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any());
doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props);
- when(cbsClient.updates(any(), any(), any())).thenReturn(Flux.error(new IOException()));
+ when(cbsClient.get(any())).thenReturn(Mono.error(new IOException()));
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class, WARN);
Flux<Type> task = refreshTaskUnderTest.createRefreshTask();
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(RefreshConfigTask.class, WARN);
Flux<Type> task = refreshTaskUnderTest.createRefreshTask();
StepVerifier //
.create(task) //
.expectSubscription() //
StepVerifier //
.create(task) //
.expectSubscription() //
- .expectNoEvent(Duration.ofMillis(100)) //
+ .expectNoEvent(Duration.ofMillis(1000)) //
.thenCancel() //
.verify();
.thenCancel() //
.verify();
JsonObject configAsJson = getJsonRootObject();
String newBaseUrl = "newBaseUrl";
modifyTheRicConfiguration(configAsJson, newBaseUrl);
JsonObject configAsJson = getJsonRootObject();
String newBaseUrl = "newBaseUrl";
modifyTheRicConfiguration(configAsJson, newBaseUrl);
- when(cbsClient.updates(any(), any(), any())).thenReturn(Flux.just(configAsJson));
+ when(cbsClient.get(any())).thenReturn(Mono.just(configAsJson));
doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class));
Flux<Type> task = refreshTaskUnderTest.createRefreshTask();
doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class));
Flux<Type> task = refreshTaskUnderTest.createRefreshTask();