RUN mkdir -p /var/log/policy-agent
RUN mkdir -p /opt/app/policy-agent/etc/cert/
-EXPOSE 8081
+EXPOSE 8081 8433
ADD /config/* /opt/app/policy-agent/config/
ADD target/${JAR} /opt/app/policy-agent/policy-agent.jar
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.oransc.policyagent: INFO
file: /var/log/policy-agent/application.log
-app:
- filepath: /opt/app/policy-agent/config/application_configuration.json
server:
- port : 8081
+ port : 8433
+ ssl:
+ key-store-type: PKCS12
+ key-store-password: policy_agent
+ key-store: classpath:keystore.jks
+ key-password: policy_agent
+app:
+ filepath: /opt/app/policy-agent/config/application_configuration.json
+
+
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.catalina.connector.Connector;
import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
return new ObjectMapper();
}
+ @Bean
+ public ServletWebServerFactory servletContainer() {
+ TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
+ tomcat.addAdditionalTomcatConnectors(getHttpConnector());
+ return tomcat;
+ }
+
+ private static Connector getHttpConnector() {
+ Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
+ connector.setScheme("http");
+ connector.setPort(8081);
+ connector.setSecure(false);
+ return connector;
+ }
+
}
package org.oransc.policyagent.clients;
import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.lang.invoke.MethodHandles;
+import javax.net.ssl.SSLException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
*/
public class AsyncRestClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final WebClient client;
+ private WebClient webClient = null;
private final String baseUrl;
public AsyncRestClient(String baseUrl) {
-
- TcpClient tcpClient = TcpClient.create() //
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
- .doOnConnected(connection -> {
- connection.addHandler(new ReadTimeoutHandler(10));
- connection.addHandler(new WriteTimeoutHandler(30));
- });
- HttpClient httpClient = HttpClient.from(tcpClient);
- ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
-
- this.client = WebClient.builder() //
- .clientConnector(connector) //
- .baseUrl(baseUrl) //
- .build();
-
this.baseUrl = baseUrl;
}
public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
logger.debug("POST uri = '{}{}''", baseUrl, uri);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(request);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(request);
+ });
}
public Mono<String> post(String uri, @Nullable String body) {
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri);
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(request) //
- .flatMap(this::toBody);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(request) //
+ .flatMap(this::toBody);
+ });
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
logger.debug("PUT uri = '{}{}''", baseUrl, uri);
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(request);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(request);
+ });
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
logger.debug("PUT uri = '{}{}''", baseUrl, uri);
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(request);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.put() //
+ .uri(uri);
+ return retrieve(request);
+ });
}
public Mono<String> put(String uri, String body) {
public Mono<ResponseEntity<String>> getForEntity(String uri) {
logger.debug("GET uri = '{}{}''", baseUrl, uri);
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(request);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.get().uri(uri);
+ return retrieve(request);
+ });
}
public Mono<String> get(String uri) {
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
logger.debug("DELETE uri = '{}{}''", baseUrl, uri);
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(request);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.delete().uri(uri);
+ return retrieve(request);
+ });
}
public Mono<String> delete(String uri) {
}
}
+ private static SslContext createSslContext() throws SSLException {
+ return SslContextBuilder.forClient() //
+ .trustManager(InsecureTrustManagerFactory.INSTANCE) //
+ .build();
+ }
+
+ private static WebClient createWebClient(String baseUrl, SslContext sslContext) {
+ TcpClient tcpClient = TcpClient.create() //
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
+ .secure(c -> c.sslContext(sslContext)) //
+ .doOnConnected(connection -> {
+ connection.addHandler(new ReadTimeoutHandler(10));
+ connection.addHandler(new WriteTimeoutHandler(30));
+ });
+ HttpClient httpClient = HttpClient.from(tcpClient);
+ ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+
+ return WebClient.builder() //
+ .clientConnector(connector) //
+ .baseUrl(baseUrl) //
+ .build();
+ }
+
+ private Mono<WebClient> getWebClient() {
+ if (this.webClient == null) {
+ try {
+ SslContext sslContext = createSslContext();
+ this.webClient = createWebClient(this.baseUrl, sslContext);
+ } catch (SSLException e) {
+ logger.error("Could not create WebClient {}", e.getMessage());
+ return Mono.error(e);
+ }
+ }
+ return Mono.just(this.webClient);
+ }
+
}
JsonObject ricAsJson = ricElem.getAsJsonObject();
JsonElement controllerNameElement = ricAsJson.get(CONTROLLER);
ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
- .name(ricAsJson.get("name").getAsString()) //
- .baseUrl(ricAsJson.get("baseUrl").getAsString()) //
- .managedElementIds(parseManagedElementIds(ricAsJson.get("managedElementIds").getAsJsonArray())) //
+ .name(get(ricAsJson, "name").getAsString()) //
+ .baseUrl(get(ricAsJson, "baseUrl").getAsString()) //
+ .managedElementIds(parseManagedElementIds(get(ricAsJson, "managedElementIds").getAsJsonArray())) //
.controllerName(controllerNameElement != null ? controllerNameElement.getAsString() : "") //
.build();
result.add(ricConfig);
for (JsonElement element : getAsJsonArray(config, CONTROLLER)) {
JsonObject controllerAsJson = element.getAsJsonObject();
ImmutableControllerConfig controllerConfig = ImmutableControllerConfig.builder() //
- .name(controllerAsJson.get("name").getAsString()) //
- .baseUrl(controllerAsJson.get("baseUrl").getAsString()) //
- .password(controllerAsJson.get("password").getAsString()) //
- .userName(controllerAsJson.get("userName").getAsString()) // )
+ .name(get(controllerAsJson, "name").getAsString()) //
+ .baseUrl(get(controllerAsJson, "baseUrl").getAsString()) //
+ .password(get(controllerAsJson, "password").getAsString()) //
+ .userName(get(controllerAsJson, "userName").getAsString()) // )
.build();
if (result.put(controllerConfig.name(), controllerConfig) != null) {
private static JsonElement get(JsonObject obj, String memberName) throws ServiceException {
JsonElement elem = obj.get(memberName);
if (elem == null) {
- throw new ServiceException("Could not find member: " + memberName + " in: " + obj);
+ throw new ServiceException("Could not find member: '" + memberName + "' in: " + obj);
}
return elem;
}
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
import java.io.BufferedInputStream;
logger.debug("Starting refreshConfigTask");
stop();
refreshTask = createRefreshTask() //
- .subscribe(
- notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger
- .error("Configuration refresh terminated due to exception {}", throwable.getMessage()),
+ .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()),
() -> logger.error("Configuration refresh terminated"));
}
.flatMap(notUsed -> loadConfigurationFromFile()) //
.onErrorResume(this::ignoreError) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
- .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
+ .doOnTerminate(() -> logger.info("loadFromFile Terminate"));
Flux<JsonObject> loadFromConsul = getEnvironment(systemEnvironment) //
.flatMap(this::createCbsClient) //
.onErrorResume(this::ignoreError) //
.doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
.doOnNext(json -> this.isConsulUsed = true) //
- .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+ .doOnTerminate(() -> logger.info("loadFromConsul Terminated"));
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig) //
.doOnNext(this::handleUpdatedRicConfig) //
.flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
- .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+ .doOnTerminate(() -> handleTerminate("Configuration refresh task is terminated"));
+ }
+
+ private void handleTerminate(String info) {
+ logger.error(info);
}
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
final Duration initialDelay = Duration.ZERO;
final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
- return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL);
+ return cbsClient.updates(getConfigRequest, initialDelay, CONSUL_CONFIG_REFRESH_INTERVAL) //
+ .onErrorResume(this::ignoreError);
}
private <R> Mono<R> ignoreError(Throwable throwable) {
ApplicationConfigParser parser = new ApplicationConfigParser();
return Mono.just(parser.parse(jsonObject));
} catch (ServiceException e) {
- logger.error("Could not parse configuration {}", e.toString(), e);
- return Mono.error(e);
+ String str = e.toString();
+ logger.error("Could not parse configuration {}", str);
+ return Mono.empty();
}
}
appParser.parse(rootObject);
logger.debug("Local configuration file loaded: {}", filepath);
return Flux.just(rootObject);
- } catch (JsonSyntaxException | ServiceException | IOException e) {
- logger.debug("Local configuration file not loaded: {}", filepath, e);
+ } catch (Exception e) {
+ logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
return Flux.empty();
}
}
}
private String baseUrl() {
- return "http://localhost:" + port;
+ return "https://localhost:" + port;
}
private String jsonString() {
logger.error("Could not load json schema ", e);
}
}
+ policyTypes.put(ImmutablePolicyType.builder().name("").schema("{}").build());
}
}
JsonObject jsonRootObject = getJsonRootObject();
JsonObject json = jsonRootObject.getAsJsonObject("config");
json.remove("ric");
- final String message = "Could not find member: ric in: " + json;
+ final String message = "Could not find member: 'ric' in: " + json;
Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));