Integrate Config Binding Service client 74/1974/6
authorYongchaoWu <yongchao.wu@est.tech>
Mon, 9 Dec 2019 09:00:46 +0000 (10:00 +0100)
committeryongchao <yongchao.wu@est.tech>
Tue, 10 Dec 2019 08:13:58 +0000 (09:13 +0100)
Reuse CBS concept in ONAP to fetch configurations from consul.

Issue-ID: NONRTRIC-79
Change-Id: Iff6d651c1140c4ff16eeac37f84d8b4578c43cfe
Signed-off-by: YongchaoWu <yongchao.wu@est.tech>
policy-agent/.gitignore
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/EnvironmentProcessor.java [moved from policy-agent/src/main/java/org/oransc/policyagent/configuration/Environment.java with 50% similarity]
policy-agent/src/main/java/org/oransc/policyagent/exceptions/EnvironmentLoaderException.java [new file with mode: 0644]
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java [new file with mode: 0644]
policy-agent/src/test/java/org/oransc/policyagent/utils/LoggingUtils.java [new file with mode: 0644]

index ebf2430..ca6a2f3 100644 (file)
@@ -3,4 +3,5 @@
 .classpath
 .settings
 target
-.checkstyle
\ No newline at end of file
+.checkstyle
+policy-agent.iml
\ No newline at end of file
index 875fb15..06036db 100644 (file)
             <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
         </license>
     </licenses>
+    <repositories>
+        <repository>
+            <id>onap-releases</id>
+            <name>onap-releases</name>
+            <url>https://nexus.onap.org/content/repositories/releases/</url>
+        </repository>
+    </repositories>
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
             <artifactId>springfox-swagger-ui</artifactId>
             <version>${springfox.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>cbs-client</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <properties>
         <java.version>11</java.version>
         <springfox.version>2.8.0</springfox.version>
         <immutable.version>2.7.1</immutable.version>
+        <sdk.version>1.1.6</sdk.version>
     </properties>
     <build>
         <plugins>
                         mvn spotless:apply to rewrite source files use mvn spotless:check to validate 
                         source files -->
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.17</version>
+                <configuration>
+                    <skipTests>false</skipTests>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>
index d8da5ee..04471c8 100644 (file)
@@ -32,6 +32,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.ServiceLoader;
@@ -40,6 +41,12 @@ import java.util.Vector;
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
 
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +54,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @EnableConfigurationProperties
@@ -57,6 +66,8 @@ public class ApplicationConfig {
     @Value("#{systemEnvironment}")
     Properties systemEnvironment;
 
+    private Disposable refreshConfigTask = null;
+
     @NotEmpty
     private String filepath;
 
@@ -93,11 +104,64 @@ public class ApplicationConfig {
     }
 
     public void initialize() {
+        stop();
         loadConfigurationFromFile(this.filepath);
+
+        refreshConfigTask = createRefreshTask() //
+                .subscribe(e -> logger.info("Refreshed configuration data"),
+                        throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+                        () -> logger.error("Configuration refresh terminated"));
+    }
+
+    Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
+        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
+    }
+
+    Flux<ApplicationConfig> createRefreshTask() {
+        return getEnvironment(systemEnvironment) //
+                .flatMap(this::createCbsClient) //
+                .flatMapMany(this::periodicConfigurationUpdates) //
+                .map(this::parseRicConfigurationfromConsul) //
+                .onErrorResume(this::onErrorResume);
+    }
+
+    Mono<CbsClient> createCbsClient(EnvProperties env) {
+        return CbsClientFactory.createCbsClient(env);
+    }
+
+    private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+        final Duration initialDelay = Duration.ZERO;
+        final Duration refreshPeriod = Duration.ofMinutes(1);
+        final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+        return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
     }
 
-    Mono<Environment.Variables> getEnvironment(Properties systemEnvironment) {
-        return Environment.readEnvironmentVariables(systemEnvironment);
+    private <R> Mono<R> onErrorResume(Throwable trowable) {
+        logger.error("Could not refresh application configuration {}", trowable.toString());
+        return Mono.empty();
+    }
+
+    private ApplicationConfig parseRicConfigurationfromConsul(JsonObject jsonObject) {
+        try {
+            ApplicationConfigParser parser = new ApplicationConfigParser();
+            parser.parse(jsonObject);
+            setConfiguration(parser.getRicConfigs());
+
+        } catch (ServiceException e) {
+            logger.error("Could not parse configuration {}", e.toString(), e);
+        }
+        return this;
+    }
+
+    private synchronized void setConfiguration(@NotNull  Vector<RicConfig> ricConfigs) {
+        this.ricConfigs = ricConfigs;
+    }
+
+    public void stop() {
+        if (refreshConfigTask != null) {
+            refreshConfigTask.dispose();
+            refreshConfigTask = null;
+        }
     }
 
     /**
  */
 
 package org.oransc.policyagent.configuration;
-
 import java.util.Optional;
 import java.util.Properties;
-
-import org.oransc.policyagent.exceptions.ServiceException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.oransc.policyagent.exceptions.EnvironmentLoaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
-class Environment {
-
-    public static class Variables {
-
-        public final String consulHost;
-        public final Integer consulPort;
-        public final String cbsName;
-        public final String appName;
-
-        public Variables(String consulHost, Integer consulPort, String cbsName, String appName) {
-            this.consulHost = consulHost;
-            this.consulPort = consulPort;
-            this.cbsName = cbsName;
-            this.appName = appName;
-        }
-    }
+class EnvironmentProcessor {
 
     private static final int DEFAULT_CONSUL_PORT = 8500;
-    private static final Logger logger = LoggerFactory.getLogger(Environment.class);
+    private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
 
-    private Environment() {
+    private EnvironmentProcessor() {
     }
 
-    static Mono<Variables> readEnvironmentVariables(Properties systemEnvironment) {
-        logger.trace("Loading system environment variables");
+    static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment) {
 
+        EnvProperties envProperties;
         try {
-            Variables envProperties = new Variables(getConsulHost(systemEnvironment) //
-                , getConsultPort(systemEnvironment) //
-                , getConfigBindingService(systemEnvironment) //
-                , getService(systemEnvironment)); //
-
-            logger.trace("Evaluated environment system variables {}", envProperties);
-            return Mono.just(envProperties);
-        } catch (ServiceException e) {
+            envProperties = ImmutableEnvProperties.builder() //
+                    .consulHost(getConsulHost(systemEnvironment)) //
+                    .consulPort(getConsultPort(systemEnvironment)) //
+                    .cbsName(getConfigBindingService(systemEnvironment)) //
+                    .appName(getService(systemEnvironment)) //
+                    .build();
+        } catch (EnvironmentLoaderException e) {
             return Mono.error(e);
         }
+        logger.trace("Evaluated environment system variables {}", envProperties);
+        return Mono.just(envProperties);
     }
 
-    private static String getConsulHost(Properties systemEnvironments) throws ServiceException {
+    private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
         return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
-            .orElseThrow(() -> new ServiceException("$CONSUL_HOST environment has not been defined"));
+                .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
     }
 
     private static Integer getConsultPort(Properties systemEnvironments) {
         return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
-            .map(Integer::valueOf) //
-            .orElseGet(Environment::getDefaultPortOfConsul);
+                .map(Integer::valueOf) //
+                .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
     }
 
-    private static String getConfigBindingService(Properties systemEnvironments) throws ServiceException {
+    private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
         return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
-            .orElseThrow(() -> new ServiceException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+                .orElseThrow(
+                        () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
     }
 
-    private static String getService(Properties systemEnvironments) throws ServiceException {
+    private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
         return Optional
-            .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
-                .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
-            .orElseThrow(() -> new ServiceException(
-                "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+                .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+                        .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+                .orElseThrow(() -> new EnvironmentLoaderException(
+                        "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
     }
 
     private static Integer getDefaultPortOfConsul() {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/exceptions/EnvironmentLoaderException.java b/policy-agent/src/main/java/org/oransc/policyagent/exceptions/EnvironmentLoaderException.java
new file mode 100644 (file)
index 0000000..df6f7de
--- /dev/null
@@ -0,0 +1,30 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.exceptions;
+
+public class EnvironmentLoaderException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    public EnvironmentLoaderException(String message) {
+        super(message);
+    }
+}
index beef586..18ea22b 100644 (file)
  * limitations under the License.
  * ========================LICENSE_END===================================
  */
-
 package org.oransc.policyagent;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
-
 import java.net.URL;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,6 +44,7 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.web.client.RestTemplate;
 
+
 @RunWith(SpringRunner.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
 public class ApplicationTest {
@@ -61,6 +60,7 @@ public class ApplicationTest {
         }
     }
 
+
     @TestConfiguration
     static class Beans {
         @Bean
@@ -122,9 +122,9 @@ public class ApplicationTest {
 
     private PolicyType addPolicyType(String name) {
         PolicyType type = ImmutablePolicyType.builder() //
-            .jsonSchema("") //
-            .name(name) //
-            .build();
+                .jsonSchema("") //
+                .name(name) //
+                .build();
 
         beans.getPolicyTypes().put(type);
         return type;
@@ -132,11 +132,11 @@ public class ApplicationTest {
 
     private Policy addPolicy(String id, String typeName, String service) throws ServiceException {
         Policy p = ImmutablePolicy.builder().id(id) //
-            .json("{}") //
-            .ownerServiceName(service) //
-            .ric(beans.getRics().getRic("ric1")) //
-            .type(addPolicyType(typeName)) //
-            .build();
+                .json("{}") //
+                .ownerServiceName(service) //
+                .ric(beans.getRics().getRic("ric1")) //
+                .type(addPolicyType(typeName)) //
+                .build();
         beans.getPolicies().put(p);
         return p;
     }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
new file mode 100644 (file)
index 0000000..0cbb395
--- /dev/null
@@ -0,0 +1,141 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package org.oransc.policyagent.configuration;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doReturn;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.oransc.policyagent.utils.LoggingUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ApplicationConfigTest {
+
+    private ApplicationConfig appConfigUnderTest;
+    CbsClient cbsClient = mock(CbsClient.class);
+
+
+    private static EnvProperties properties() {
+        return ImmutableEnvProperties.builder() //
+                .consulHost("host") //
+                .consulPort(123) //
+                .cbsName("cbsName") //
+                .appName("appName") //
+                .build();
+    }
+
+    @Test
+    public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+
+        appConfigUnderTest = spy(ApplicationConfig.class);
+        appConfigUnderTest.systemEnvironment = new Properties();
+
+        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ApplicationConfig.class);
+        Flux<ApplicationConfig> task = appConfigUnderTest.createRefreshTask();
+
+        StepVerifier.create(task)
+                .expectSubscription()
+                .verifyComplete();
+
+        assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+    }
+
+    @Test
+    public void whenPeriodicConfigRefreshNoConsul() {
+        appConfigUnderTest = spy(ApplicationConfig.class);
+        appConfigUnderTest.systemEnvironment = new Properties();
+
+        EnvProperties props = properties();
+        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any());
+
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+        Flux<JsonObject> err = Flux.error(new IOException());
+        doReturn(err).when(cbsClient).updates(any(), any(), any());
+
+        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ApplicationConfig.class);
+        Flux<ApplicationConfig> task = appConfigUnderTest.createRefreshTask();
+
+        StepVerifier //
+                .create(task) //
+                .expectSubscription() //
+                .verifyComplete();
+
+        assertTrue(
+                logAppender.list.toString().contains("Could not refresh application configuration java.io.IOException"));
+    }
+
+    @Test
+    public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+        appConfigUnderTest = spy(ApplicationConfig.class);
+        appConfigUnderTest.systemEnvironment = new Properties();
+
+        EnvProperties props = properties();
+        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any());
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+
+        Flux<JsonObject> json = Flux.just(getJsonRootObject());
+        doReturn(json).when(cbsClient).updates(any(), any(), any());
+
+        Flux<ApplicationConfig> task = appConfigUnderTest.createRefreshTask();
+
+        StepVerifier //
+                .create(task) //
+                .expectSubscription() //
+                .expectNext(appConfigUnderTest) //
+                .verifyComplete();
+
+        Assertions.assertNotNull(appConfigUnderTest.getRicConfigs());
+    }
+
+    private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
+        JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+        return rootObject;
+    }
+
+    private static InputStream getCorrectJson() throws IOException {
+        URL url = ApplicationConfigParser.class.getClassLoader().getResource("test_application_configuration.json");
+        String string = Resources.toString(url, Charsets.UTF_8);
+        return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+    }
+
+}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/LoggingUtils.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/LoggingUtils.java
new file mode 100644 (file)
index 0000000..a822bb3
--- /dev/null
@@ -0,0 +1,56 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.utils;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+import org.slf4j.LoggerFactory;
+
+public class LoggingUtils {
+
+    /**
+     * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+     */
+    public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass) {
+        return getLogListAppender(logClass, false);
+    }
+
+    /**
+     * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+     *
+     * @param logClass class whose appender is wanted.
+     * @param allLevels true if all log levels should be activated.
+     */
+    public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass, boolean allLevels) {
+        Logger logger = (Logger) LoggerFactory.getLogger(logClass);
+        if (allLevels) {
+            logger.setLevel(Level.ALL);
+        }
+        ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
+        listAppender.start();
+        logger.addAppender(listAppender);
+
+        return listAppender;
+    }
+}