Reuse CBS concept in ONAP to fetch configurations from consul.
Issue-ID: NONRTRIC-79
Change-Id: Iff6d651c1140c4ff16eeac37f84d8b4578c43cfe
Signed-off-by: YongchaoWu <yongchao.wu@est.tech>
.classpath
.settings
target
-.checkstyle
\ No newline at end of file
+.checkstyle
+policy-agent.iml
\ No newline at end of file
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
+ <repositories>
+ <repository>
+ <id>onap-releases</id>
+ <name>onap-releases</name>
+ <url>https://nexus.onap.org/content/repositories/releases/</url>
+ </repository>
+ </repositories>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${springfox.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ <version>${sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<properties>
<java.version>11</java.version>
<springfox.version>2.8.0</springfox.version>
<immutable.version>2.7.1</immutable.version>
+ <sdk.version>1.1.6</sdk.version>
</properties>
<build>
<plugins>
mvn spotless:apply to rewrite source files use mvn spotless:check to validate
source files -->
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <skipTests>false</skipTests>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.oransc.policyagent.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@EnableConfigurationProperties
@Value("#{systemEnvironment}")
Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
+
@NotEmpty
private String filepath;
}
public void initialize() {
+ stop();
loadConfigurationFromFile(this.filepath);
+
+ refreshConfigTask = createRefreshTask() //
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+ }
+
+ Flux<ApplicationConfig> createRefreshTask() {
+ return getEnvironment(systemEnvironment) //
+ .flatMap(this::createCbsClient) //
+ .flatMapMany(this::periodicConfigurationUpdates) //
+ .map(this::parseRicConfigurationfromConsul) //
+ .onErrorResume(this::onErrorResume);
+ }
+
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
}
- Mono<Environment.Variables> getEnvironment(Properties systemEnvironment) {
- return Environment.readEnvironmentVariables(systemEnvironment);
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+ try {
+ ApplicationConfigParser parser = new ApplicationConfigParser();
+ parser.parse(jsonObject);
+ setConfiguration(parser.getRicConfigs());
+
+ } catch (ServiceException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
+ private synchronized void setConfiguration(@NotNull Vector<RicConfig> ricConfigs) {
+ this.ricConfigs = ricConfigs;
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
}
/**
*/
package org.oransc.policyagent.configuration;
-
import java.util.Optional;
import java.util.Properties;
-
-import org.oransc.policyagent.exceptions.ServiceException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.oransc.policyagent.exceptions.EnvironmentLoaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
-class Environment {
-
- public static class Variables {
-
- public final String consulHost;
- public final Integer consulPort;
- public final String cbsName;
- public final String appName;
-
- public Variables(String consulHost, Integer consulPort, String cbsName, String appName) {
- this.consulHost = consulHost;
- this.consulPort = consulPort;
- this.cbsName = cbsName;
- this.appName = appName;
- }
- }
+class EnvironmentProcessor {
private static final int DEFAULT_CONSUL_PORT = 8500;
- private static final Logger logger = LoggerFactory.getLogger(Environment.class);
+ private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
- private Environment() {
+ private EnvironmentProcessor() {
}
- static Mono<Variables> readEnvironmentVariables(Properties systemEnvironment) {
- logger.trace("Loading system environment variables");
+ static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment) {
+ EnvProperties envProperties;
try {
- Variables envProperties = new Variables(getConsulHost(systemEnvironment) //
- , getConsultPort(systemEnvironment) //
- , getConfigBindingService(systemEnvironment) //
- , getService(systemEnvironment)); //
-
- logger.trace("Evaluated environment system variables {}", envProperties);
- return Mono.just(envProperties);
- } catch (ServiceException e) {
+ envProperties = ImmutableEnvProperties.builder() //
+ .consulHost(getConsulHost(systemEnvironment)) //
+ .consulPort(getConsultPort(systemEnvironment)) //
+ .cbsName(getConfigBindingService(systemEnvironment)) //
+ .appName(getService(systemEnvironment)) //
+ .build();
+ } catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
+ logger.trace("Evaluated environment system variables {}", envProperties);
+ return Mono.just(envProperties);
}
- private static String getConsulHost(Properties systemEnvironments) throws ServiceException {
+ private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new ServiceException("$CONSUL_HOST environment has not been defined"));
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
}
private static Integer getConsultPort(Properties systemEnvironments) {
return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
- .map(Integer::valueOf) //
- .orElseGet(Environment::getDefaultPortOfConsul);
+ .map(Integer::valueOf) //
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
}
- private static String getConfigBindingService(Properties systemEnvironments) throws ServiceException {
+ private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
- .orElseThrow(() -> new ServiceException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ .orElseThrow(
+ () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
}
- private static String getService(Properties systemEnvironments) throws ServiceException {
+ private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
return Optional
- .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new ServiceException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
}
private static Integer getDefaultPortOfConsul() {
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.exceptions;
+
+public class EnvironmentLoaderException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public EnvironmentLoaderException(String message) {
+ super(message);
+ }
+}
* limitations under the License.
* ========================LICENSE_END===================================
*/
-
package org.oransc.policyagent;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
-
import java.net.URL;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
+
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class ApplicationTest {
}
}
+
@TestConfiguration
static class Beans {
@Bean
private PolicyType addPolicyType(String name) {
PolicyType type = ImmutablePolicyType.builder() //
- .jsonSchema("") //
- .name(name) //
- .build();
+ .jsonSchema("") //
+ .name(name) //
+ .build();
beans.getPolicyTypes().put(type);
return type;
private Policy addPolicy(String id, String typeName, String service) throws ServiceException {
Policy p = ImmutablePolicy.builder().id(id) //
- .json("{}") //
- .ownerServiceName(service) //
- .ric(beans.getRics().getRic("ric1")) //
- .type(addPolicyType(typeName)) //
- .build();
+ .json("{}") //
+ .ownerServiceName(service) //
+ .ric(beans.getRics().getRic("ric1")) //
+ .type(addPolicyType(typeName)) //
+ .build();
beans.getPolicies().put(p);
return p;
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package org.oransc.policyagent.configuration;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doReturn;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.oransc.policyagent.utils.LoggingUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ApplicationConfigTest {
+
+ private ApplicationConfig appConfigUnderTest;
+ CbsClient cbsClient = mock(CbsClient.class);
+
+
+ private static EnvProperties properties() {
+ return ImmutableEnvProperties.builder() //
+ .consulHost("host") //
+ .consulPort(123) //
+ .cbsName("cbsName") //
+ .appName("appName") //
+ .build();
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+
+ appConfigUnderTest = spy(ApplicationConfig.class);
+ appConfigUnderTest.systemEnvironment = new Properties();
+
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ApplicationConfig.class);
+ Flux<ApplicationConfig> task = appConfigUnderTest.createRefreshTask();
+
+ StepVerifier.create(task)
+ .expectSubscription()
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshNoConsul() {
+ appConfigUnderTest = spy(ApplicationConfig.class);
+ appConfigUnderTest.systemEnvironment = new Properties();
+
+ EnvProperties props = properties();
+ doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any());
+
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ Flux<JsonObject> err = Flux.error(new IOException());
+ doReturn(err).when(cbsClient).updates(any(), any(), any());
+
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ApplicationConfig.class);
+ Flux<ApplicationConfig> task = appConfigUnderTest.createRefreshTask();
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .verifyComplete();
+
+ assertTrue(
+ logAppender.list.toString().contains("Could not refresh application configuration java.io.IOException"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+ appConfigUnderTest = spy(ApplicationConfig.class);
+ appConfigUnderTest.systemEnvironment = new Properties();
+
+ EnvProperties props = properties();
+ doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any());
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+
+ Flux<JsonObject> json = Flux.just(getJsonRootObject());
+ doReturn(json).when(cbsClient).updates(any(), any(), any());
+
+ Flux<ApplicationConfig> task = appConfigUnderTest.createRefreshTask();
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getRicConfigs());
+ }
+
+ private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
+ JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ return rootObject;
+ }
+
+ private static InputStream getCorrectJson() throws IOException {
+ URL url = ApplicationConfigParser.class.getClassLoader().getResource("test_application_configuration.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.utils;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+import org.slf4j.LoggerFactory;
+
+public class LoggingUtils {
+
+ /**
+ * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+ */
+ public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass) {
+ return getLogListAppender(logClass, false);
+ }
+
+ /**
+ * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+ *
+ * @param logClass class whose appender is wanted.
+ * @param allLevels true if all log levels should be activated.
+ */
+ public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass, boolean allLevels) {
+ Logger logger = (Logger) LoggerFactory.getLogger(logClass);
+ if (allLevels) {
+ logger.setLevel(Level.ALL);
+ }
+ ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
+ listAppender.start();
+ logger.addAppender(listAppender);
+
+ return listAppender;
+ }
+}