Merge "Remove unnecessary stuff from northbound directory of A1 controller"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Mon, 27 Apr 2020 05:46:04 +0000 (05:46 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Mon, 27 Apr 2020 05:46:04 +0000 (05:46 +0000)
policy-agent/Dockerfile
policy-agent/config/application.yaml
policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java
policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/main/resources/keystore.jks
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java

index 26ce531..da9fd44 100644 (file)
@@ -25,7 +25,7 @@ WORKDIR /opt/app/policy-agent
 RUN mkdir -p /var/log/policy-agent
 RUN mkdir -p /opt/app/policy-agent/etc/cert/
 
-EXPOSE 8081
+EXPOSE 8081 8433
 
 ADD /config/* /opt/app/policy-agent/config/
 ADD target/${JAR} /opt/app/policy-agent/policy-agent.jar
index 5416a4d..b128494 100644 (file)
@@ -19,7 +19,14 @@ logging:
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.oransc.policyagent: INFO
   file: /var/log/policy-agent/application.log
-app:
-  filepath: /opt/app/policy-agent/config/application_configuration.json  
 server:
-   port : 8081
+   port : 8433
+   ssl:
+      key-store-type: PKCS12
+      key-store-password: policy_agent
+      key-store: classpath:keystore.jks
+      key-password: policy_agent
+app:
+  filepath: /opt/app/policy-agent/config/application_configuration.json
+
+
index 93e1739..e2874cb 100644 (file)
@@ -22,12 +22,15 @@ package org.oransc.policyagent;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.catalina.connector.Connector;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Services;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -70,4 +73,19 @@ class BeanFactory {
         return new ObjectMapper();
     }
 
+    @Bean
+    public ServletWebServerFactory servletContainer() {
+        TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
+        tomcat.addAdditionalTomcatConnectors(getHttpConnector());
+        return tomcat;
+    }
+
+    private static Connector getHttpConnector() {
+        Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
+        connector.setScheme("http");
+        connector.setPort(8081);
+        connector.setSecure(false);
+        return connector;
+    }
+
 }
index f0b2ce3..ef1acfc 100644 (file)
 package org.oransc.policyagent.clients;
 
 import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.handler.timeout.WriteTimeoutHandler;
 
 import java.lang.invoke.MethodHandles;
 
+import javax.net.ssl.SSLException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
@@ -45,36 +50,24 @@ import reactor.netty.tcp.TcpClient;
  */
 public class AsyncRestClient {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-    private final WebClient client;
+    private WebClient webClient = null;
     private final String baseUrl;
 
     public AsyncRestClient(String baseUrl) {
-
-        TcpClient tcpClient = TcpClient.create() //
-            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
-            .doOnConnected(connection -> {
-                connection.addHandler(new ReadTimeoutHandler(10));
-                connection.addHandler(new WriteTimeoutHandler(30));
-            });
-        HttpClient httpClient = HttpClient.from(tcpClient);
-        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
-
-        this.client = WebClient.builder() //
-            .clientConnector(connector) //
-            .baseUrl(baseUrl) //
-            .build();
-
         this.baseUrl = baseUrl;
     }
 
     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
         logger.debug("POST uri = '{}{}''", baseUrl, uri);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        RequestHeadersSpec<?> request = client.post() //
-            .uri(uri) //
-            .contentType(MediaType.APPLICATION_JSON) //
-            .body(bodyProducer, String.class);
-        return retrieve(request);
+        return getWebClient() //
+            .flatMap(client -> {
+                RequestHeadersSpec<?> request = client.post() //
+                    .uri(uri) //
+                    .contentType(MediaType.APPLICATION_JSON) //
+                    .body(bodyProducer, String.class);
+                return retrieve(request);
+            });
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
@@ -84,29 +77,38 @@ public class AsyncRestClient {
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri);
-        RequestHeadersSpec<?> request = client.post() //
-            .uri(uri) //
-            .headers(headers -> headers.setBasicAuth(username, password)) //
-            .contentType(MediaType.APPLICATION_JSON) //
-            .bodyValue(body);
-        return retrieve(request) //
-            .flatMap(this::toBody);
+        return getWebClient() //
+            .flatMap(client -> {
+                RequestHeadersSpec<?> request = client.post() //
+                    .uri(uri) //
+                    .headers(headers -> headers.setBasicAuth(username, password)) //
+                    .contentType(MediaType.APPLICATION_JSON) //
+                    .bodyValue(body);
+                return retrieve(request) //
+                    .flatMap(this::toBody);
+            });
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         logger.debug("PUT uri = '{}{}''", baseUrl, uri);
-        RequestHeadersSpec<?> request = client.put() //
-            .uri(uri) //
-            .contentType(MediaType.APPLICATION_JSON) //
-            .bodyValue(body);
-        return retrieve(request);
+        return getWebClient() //
+            .flatMap(client -> {
+                RequestHeadersSpec<?> request = client.put() //
+                    .uri(uri) //
+                    .contentType(MediaType.APPLICATION_JSON) //
+                    .bodyValue(body);
+                return retrieve(request);
+            });
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         logger.debug("PUT uri = '{}{}''", baseUrl, uri);
-        RequestHeadersSpec<?> request = client.put() //
-            .uri(uri);
-        return retrieve(request);
+        return getWebClient() //
+            .flatMap(client -> {
+                RequestHeadersSpec<?> request = client.put() //
+                    .uri(uri);
+                return retrieve(request);
+            });
     }
 
     public Mono<String> put(String uri, String body) {
@@ -116,8 +118,11 @@ public class AsyncRestClient {
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         logger.debug("GET uri = '{}{}''", baseUrl, uri);
-        RequestHeadersSpec<?> request = client.get().uri(uri);
-        return retrieve(request);
+        return getWebClient() //
+            .flatMap(client -> {
+                RequestHeadersSpec<?> request = client.get().uri(uri);
+                return retrieve(request);
+            });
     }
 
     public Mono<String> get(String uri) {
@@ -127,8 +132,11 @@ public class AsyncRestClient {
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         logger.debug("DELETE uri = '{}{}''", baseUrl, uri);
-        RequestHeadersSpec<?> request = client.delete().uri(uri);
-        return retrieve(request);
+        return getWebClient() //
+            .flatMap(client -> {
+                RequestHeadersSpec<?> request = client.delete().uri(uri);
+                return retrieve(request);
+            });
     }
 
     public Mono<String> delete(String uri) {
@@ -160,4 +168,40 @@ public class AsyncRestClient {
         }
     }
 
+    private static SslContext createSslContext() throws SSLException {
+        return SslContextBuilder.forClient() //
+            .trustManager(InsecureTrustManagerFactory.INSTANCE) //
+            .build();
+    }
+
+    private static WebClient createWebClient(String baseUrl, SslContext sslContext) {
+        TcpClient tcpClient = TcpClient.create() //
+            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
+            .secure(c -> c.sslContext(sslContext)) //
+            .doOnConnected(connection -> {
+                connection.addHandler(new ReadTimeoutHandler(10));
+                connection.addHandler(new WriteTimeoutHandler(30));
+            });
+        HttpClient httpClient = HttpClient.from(tcpClient);
+        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+
+        return WebClient.builder() //
+            .clientConnector(connector) //
+            .baseUrl(baseUrl) //
+            .build();
+    }
+
+    private Mono<WebClient> getWebClient() {
+        if (this.webClient == null) {
+            try {
+                SslContext sslContext = createSslContext();
+                this.webClient = createWebClient(this.baseUrl, sslContext);
+            } catch (SSLException e) {
+                logger.error("Could not create WebClient {}", e.getMessage());
+                return Mono.error(e);
+            }
+        }
+        return Mono.just(this.webClient);
+    }
+
 }
index dd35e5b..ea66053 100644 (file)
@@ -119,9 +119,9 @@ public class ApplicationConfigParser {
             JsonObject ricAsJson = ricElem.getAsJsonObject();
             JsonElement controllerNameElement = ricAsJson.get(CONTROLLER);
             ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
-                .name(ricAsJson.get("name").getAsString()) //
-                .baseUrl(ricAsJson.get("baseUrl").getAsString()) //
-                .managedElementIds(parseManagedElementIds(ricAsJson.get("managedElementIds").getAsJsonArray())) //
+                .name(get(ricAsJson, "name").getAsString()) //
+                .baseUrl(get(ricAsJson, "baseUrl").getAsString()) //
+                .managedElementIds(parseManagedElementIds(get(ricAsJson, "managedElementIds").getAsJsonArray())) //
                 .controllerName(controllerNameElement != null ? controllerNameElement.getAsString() : "") //
                 .build();
             result.add(ricConfig);
@@ -137,10 +137,10 @@ public class ApplicationConfigParser {
         for (JsonElement element : getAsJsonArray(config, CONTROLLER)) {
             JsonObject controllerAsJson = element.getAsJsonObject();
             ImmutableControllerConfig controllerConfig = ImmutableControllerConfig.builder() //
-                .name(controllerAsJson.get("name").getAsString()) //
-                .baseUrl(controllerAsJson.get("baseUrl").getAsString()) //
-                .password(controllerAsJson.get("password").getAsString()) //
-                .userName(controllerAsJson.get("userName").getAsString()) // )
+                .name(get(controllerAsJson, "name").getAsString()) //
+                .baseUrl(get(controllerAsJson, "baseUrl").getAsString()) //
+                .password(get(controllerAsJson, "password").getAsString()) //
+                .userName(get(controllerAsJson, "userName").getAsString()) // )
                 .build();
 
             if (result.put(controllerConfig.name(), controllerConfig) != null) {
@@ -164,7 +164,7 @@ public class ApplicationConfigParser {
     private static JsonElement get(JsonObject obj, String memberName) throws ServiceException {
         JsonElement elem = obj.get(memberName);
         if (elem == null) {
-            throw new ServiceException("Could not find member: " + memberName + " in: " + obj);
+            throw new ServiceException("Could not find member: '" + memberName + "' in: " + obj);
         }
         return elem;
     }
index 7cfe486..41f2064 100644 (file)
@@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
 import com.google.gson.TypeAdapterFactory;
 
 import java.io.BufferedInputStream;
@@ -109,9 +108,8 @@ public class RefreshConfigTask {
         logger.debug("Starting refreshConfigTask");
         stop();
         refreshTask = createRefreshTask() //
-            .subscribe(
-                notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger
-                    .error("Configuration refresh terminated due to exception {}", throwable.getMessage()),
+            .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
+                throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()),
                 () -> logger.error("Configuration refresh terminated"));
     }
 
@@ -128,7 +126,7 @@ public class RefreshConfigTask {
             .flatMap(notUsed -> loadConfigurationFromFile()) //
             .onErrorResume(this::ignoreError) //
             .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
-            .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
+            .doOnTerminate(() -> logger.info("loadFromFile Terminate"));
 
         Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
             .flatMap(this::createCbsClient) //
@@ -136,14 +134,18 @@ public class RefreshConfigTask {
             .onErrorResume(this::ignoreError) //
             .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
             .doOnNext(json -> this.isConsulUsed = true) //
-            .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+            .doOnTerminate(() -> logger.info("loadFromConsul Terminated"));
 
         return Flux.merge(loadFromFile, loadFromConsul) //
             .flatMap(this::parseConfiguration) //
             .flatMap(this::updateConfig) //
             .doOnNext(this::handleUpdatedRicConfig) //
             .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
-            .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+            .doOnTerminate(() -> handleTerminate("Configuration refresh task is terminated"));
+    }
+
+    private void handleTerminate(String info) {
+        logger.error(info);
     }
 
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
@@ -157,7 +159,8 @@ public class RefreshConfigTask {
     private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
         final Duration initialDelay = Duration.ZERO;
         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
-        return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
+        return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL) //
+            .onErrorResume(this::ignoreError);
     }
 
     private <R> Mono<R> ignoreError(Throwable throwable) {
@@ -171,8 +174,9 @@ public class RefreshConfigTask {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             return Mono.just(parser.parse(jsonObject));
         } catch (ServiceException e) {
-            logger.error("Could not parse configuration {}", e.toString(), e);
-            return Mono.error(e);
+            String str = e.toString();
+            logger.error("Could not parse configuration {}", str);
+            return Mono.empty();
         }
     }
 
@@ -232,8 +236,8 @@ public class RefreshConfigTask {
             appParser.parse(rootObject);
             logger.debug("Local configuration file loaded: {}", filepath);
             return Flux.just(rootObject);
-        } catch (JsonSyntaxException | ServiceException | IOException e) {
-            logger.debug("Local configuration file not loaded: {}", filepath, e);
+        } catch (Exception e) {
+            logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
             return Flux.empty();
         }
     }
index 574a585..3cd6bb7 100644 (file)
Binary files a/policy-agent/src/main/resources/keystore.jks and b/policy-agent/src/main/resources/keystore.jks differ
index 6905c70..0027cca 100644 (file)
@@ -659,7 +659,7 @@ public class ApplicationTest {
     }
 
     private String baseUrl() {
-        return "http://localhost:" + port;
+        return "https://localhost:" + port;
     }
 
     private String jsonString() {
index 2b66a35..cdf614c 100644 (file)
@@ -123,6 +123,7 @@ public class MockPolicyAgent {
                     logger.error("Could not load json schema ", e);
                 }
             }
+            policyTypes.put(ImmutablePolicyType.builder().name("").schema("{}").build());
         }
     }
 
index 3303e3f..eb37ec5 100644 (file)
@@ -201,7 +201,7 @@ public class ApplicationConfigParserTest {
         JsonObject jsonRootObject = getJsonRootObject();
         JsonObject json = jsonRootObject.getAsJsonObject("config");
         json.remove("ric");
-        final String message = "Could not find member: ric in: " + json;
+        final String message = "Could not find member: 'ric' in: " + json;
 
         Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));