From: PatrikBuhr Date: Wed, 15 Sep 2021 13:43:53 +0000 (+0200) Subject: NONRTRIC - Implement DMaaP mediator producer service in Java X-Git-Tag: 1.2.0~92 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F10%2F6710%2F5;p=nonrtric.git NONRTRIC - Implement DMaaP mediator producer service in Java Implement DMaaP mediator producer service in Java. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: I9927b3d25094b47484c9fcc60950b7b64c2a4006 --- diff --git a/dmaap-adaptor-java/Dockerfile b/dmaap-adaptor-java/Dockerfile new file mode 100644 index 00000000..46dfba55 --- /dev/null +++ b/dmaap-adaptor-java/Dockerfile @@ -0,0 +1,43 @@ +# +# ============LICENSE_START======================================================= +# ONAP : ccsdk oran +# ================================================================================ +# Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END========================================================= + + +FROM openjdk:11-jre-slim + +EXPOSE 8084 8435 + +ARG JAR + +WORKDIR /opt/app/dmaap-adaptor-service +RUN mkdir -p /var/log/dmaap-adaptor-service +RUN mkdir -p /opt/app/dmaap-adaptor-service/etc/cert/ +RUN mkdir -p /var/dmaap-adaptor-service +RUN chmod -R 777 /var/dmaap-adaptor-service + +ADD /config/application.yaml /opt/app/dmaap-adaptor-service/config/application.yaml +ADD /config/application_configuration.json /opt/app/dmaap-adaptor-service/data/application_configuration.json_example +ADD /config/keystore.jks /opt/app/dmaap-adaptor-service/etc/cert/keystore.jks +ADD /config/truststore.jks /opt/app/dmaap-adaptor-service/etc/cert/truststore.jks + +RUN chmod -R 777 /opt/app/dmaap-adaptor-service/config/ + +ADD target/${JAR} /opt/app/dmaap-adaptor-service/dmaap-adaptor.jar +CMD ["java", "-jar", "/opt/app/policy-agent/dmaap-adaptor.jar"] diff --git a/dmaap-adaptor-java/config/README b/dmaap-adaptor-java/config/README new file mode 100644 index 00000000..140927f7 --- /dev/null +++ b/dmaap-adaptor-java/config/README @@ -0,0 +1,41 @@ +The keystore.jks and truststore.jks files are created by using the following commands (note that this is an example): + +1) Create a CA certificate and a private key: + +openssl genrsa -des3 -out CA-key.pem 2048 +openssl req -new -key CA-key.pem -x509 -days 1000 -out CA-cert.pem + +2) Create a keystore with a private key entry that is signed by the CA: + +keytool -genkeypair -alias policy_agent -keyalg RSA -keysize 2048 -keystore keystore.jks -validity 3650 -storepass policy_agent +keytool -certreq -alias policy_agent -file request.csr -keystore keystore.jks -ext san=dns:your.domain.com -storepass policy_agent +openssl x509 -req -days 365 -in request.csr -CA CA-cert.pem -CAkey CA-key.pem -CAcreateserial -out ca_signed-cert.pem +keytool -importcert -alias ca_cert -file CA-cert.pem -keystore keystore.jks -trustcacerts -storepass policy_agent +keytool -importcert -alias policy_agent -file ca_signed-cert.pem -keystore keystore.jks -trustcacerts -storepass policy_agent + + +3) Create a trust store containing the CA cert (to trust all certs signed by the CA): + +keytool -genkeypair -alias not_used -keyalg RSA -keysize 2048 -keystore truststore.jks -validity 3650 -storepass policy_agent +keytool -importcert -alias ca_cert -file CA-cert.pem -keystore truststore.jks -trustcacerts -storepass policy_agent + + +4) Command for listing of the contents of jks files, examples: +keytool -list -v -keystore keystore.jks -storepass policy_agent +keytool -list -v -keystore truststore.jks -storepass policy_agent + +## License + +Copyright (C) 2020 Nordix Foundation. All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml new file mode 100644 index 00000000..3990ceb2 --- /dev/null +++ b/dmaap-adaptor-java/config/application.yaml @@ -0,0 +1,56 @@ +spring: + profiles: + active: prod + main: + allow-bean-definition-overriding: true + aop: + auto: false +management: + endpoints: + web: + exposure: + # Enabling of springboot actuator features. See springboot documentation. + include: "loggers,logfile,health,info,metrics,threaddump,heapdump" + +logging: + # Configuration of logging + level: + ROOT: ERROR + org.springframework: ERROR + org.springframework.data: ERROR + org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR + org.oran.dmaapadapter: INFO + file: + name: /var/log/dmaap-adaptor-service/application.log +server: + # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework. + # See springboot documentation. + port : 8435 + http-port: 8084 + ssl: + key-store-type: JKS + key-store-password: policy_agent + key-store: /opt/app/dmaap-adaptor-service/etc/cert/keystore.jks + key-password: policy_agent + key-alias: policy_agent +app: + webclient: + # Configuration of the trust store used for the HTTP client (outgoing requests) + # The file location and the password for the truststore is only relevant if trust-store-used == true + # Note that the same keystore as for the server is used. + trust-store-used: false + trust-store-password: policy_agent + trust-store: /opt/app/dmaap-adaptor-service/etc/cert/truststore.jks + # Configuration of usage of HTTP Proxy for the southbound accesses. + # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s + http.proxy-host: + http.proxy-port: 0 + vardata-directory: /var/dmaap-adaptor-service + ecs-base-url: https://localhost:8434 + # Location of the component configuration file. The file will only be used if the Consul database is not used; + # configuration from the Consul will override the file. + configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json + dmaap-base-url: http://dradmin:dradmin@localhost:2222 + # The url used to adress this component. This is used as a callback url sent to other components. + dmaap-adapter-base-url: https://localhost:8435 + diff --git a/dmaap-adaptor-java/config/application_configuration.json b/dmaap-adaptor-java/config/application_configuration.json new file mode 100644 index 00000000..a8967d8b --- /dev/null +++ b/dmaap-adaptor-java/config/application_configuration.json @@ -0,0 +1,8 @@ +{ + "types": [ + { + "id": "ExampleInformationType", + "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + } + ] +} diff --git a/dmaap-adaptor-java/config/keystore.jks b/dmaap-adaptor-java/config/keystore.jks new file mode 100644 index 00000000..122997ac Binary files /dev/null and b/dmaap-adaptor-java/config/keystore.jks differ diff --git a/dmaap-adaptor-java/config/truststore.jks b/dmaap-adaptor-java/config/truststore.jks new file mode 100644 index 00000000..60d62889 Binary files /dev/null and b/dmaap-adaptor-java/config/truststore.jks differ diff --git a/dmaap-adaptor-java/eclipse-formatter.xml b/dmaap-adaptor-java/eclipse-formatter.xml new file mode 100644 index 00000000..b2e86eb5 --- /dev/null +++ b/dmaap-adaptor-java/eclipse-formatter.xml @@ -0,0 +1,362 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml new file mode 100644 index 00000000..01f51e27 --- /dev/null +++ b/dmaap-adaptor-java/pom.xml @@ -0,0 +1,383 @@ + + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.5.3 + + + org.o-ran-sc.nonrtric + dmaap-adaptor + 1.0.0-SNAPSHOT + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + onap-releases + onap-releases + https://nexus.onap.org/content/repositories/releases/ + + + + 11 + 3.0.0 + 2.8.2 + 1.1.6 + 2.1.6 + 20190722 + 3.6 + 3.8.0 + 2.12.2 + 1.24.3 + 3.0.11 + 0.30.0 + 1.1.11 + 2.1.1 + 3.7.0.1746 + 0.8.5 + true + + + + org.springdoc + springdoc-openapi-ui + 1.5.4 + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-thymeleaf + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework + spring-webflux + + + io.swagger.core.v3 + swagger-jaxrs2 + ${swagger.version} + + + io.swagger.core.v3 + swagger-jaxrs2-servlet-initializer + ${swagger.version} + + + javax.xml.bind + jaxb-api + + + org.immutables + value + ${immutable.version} + provided + + + org.immutables + gson + ${immutable.version} + + + org.json + json + ${json.version} + + + commons-net + commons-net + ${commons-net.version} + + + org.onap.dcaegen2.services.sdk.rest.services + cbs-client + ${sdk.version} + + + org.projectlombok + lombok + provided + + + javax.ws.rs + javax.ws.rs-api + ${javax.ws.rs-api.version} + + + + com.github.erosb + everit-json-schema + 1.12.1 + + + + org.springframework.boot + spring-boot-starter-actuator + + + + io.springfox + springfox-swagger2 + ${springfox.version} + + + io.springfox + springfox-swagger-ui + ${springfox.version} + + + + org.springframework.boot + spring-boot-devtools + true + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.awaitility + awaitility + test + + + io.projectreactor + reactor-test + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.mockito + mockito-junit-jupiter + test + + + org.mockito + mockito-core + test + + + com.squareup.okhttp3 + mockwebserver + test + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + net.revelc.code.formatter + formatter-maven-plugin + ${formatter-maven-plugin.version} + + ${project.basedir}/eclipse-formatter.xml + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless-maven-plugin.version} + + + + + com,java,javax,org + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/annotations/ + + + + + + + org.jacoco + jacoco-maven-plugin + ${jacoco-maven-plugin.version} + + + default-prepare-agent + + prepare-agent + + + + default-report + prepare-package + + report + + + + + + io.swagger.codegen.v3 + swagger-codegen-maven-plugin + ${swagger-codegen-maven-plugin.version} + + + test + + generate + + + ${project.basedir}/api/api.json + openapi-yaml + ${project.basedir}/api + + ecs-api.yaml + + + + + + + io.fabric8 + docker-maven-plugin + ${docker-maven-plugin} + false + + + generate-dmaap-adaptor-image + package + + build + + + ${env.CONTAINER_PULL_REGISTRY} + + + o-ran-sc/nonrtric-dmaap-adaptor:${project.version} + + try + ${basedir} + Dockerfile + + ${project.build.finalName}.jar + + + ${project.version} + + + + + + + + push-dmaap-adaptor-image + + build + push + + + ${env.CONTAINER_PULL_REGISTRY} + ${env.CONTAINER_PUSH_REGISTRY} + + + o-ran-sc/nonrtric-dmaap-adaptor:${project.version} + + ${basedir} + Dockerfile + + ${project.build.finalName}.jar + + + ${project.version} + latest + + + + + + + + + + + org.sonarsource.scanner.maven + sonar-maven-plugin + ${sonar-maven-plugin.version} + + + + + JIRA + https://jira.o-ran-sc.org/ + + \ No newline at end of file diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/Application.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/Application.java new file mode 100644 index 00000000..aa10972d --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/Application.java @@ -0,0 +1,33 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java new file mode 100644 index 00000000..c9ba93fc --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java @@ -0,0 +1,79 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import java.util.Collection; + +import org.apache.catalina.connector.Connector; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.DmaapMessageConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BeanFactory { + + @Value("${server.http-port}") + private int httpPort = 0; + + @Bean + public ApplicationConfig getApplicationConfig() { + return new ApplicationConfig(); + } + + @Bean + public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) { + Collection types = appConfig.getTypes(); + + // Start a consumer for each type + for (InfoType type : types) { + DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs); + topicConsumer.start(); + } + + return new InfoTypes(types); + } + + @Bean + public ServletWebServerFactory servletContainer() { + TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(); + if (httpPort > 0) { + tomcat.addAdditionalTomcatConnectors(getHttpConnector(httpPort)); + } + return tomcat; + } + + private static Connector getHttpConnector(int httpPort) { + Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); + connector.setScheme("http"); + connector.setPort(httpPort); + connector.setSecure(false); + return connector; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java new file mode 100644 index 00000000..ec1541cf --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -0,0 +1,239 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.clients; + +import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicInteger; + +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; +import org.springframework.web.reactive.function.client.WebClientResponseException; + +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.transport.ProxyProvider; + +/** + * Generic reactive REST client. + */ +public class AsyncRestClient { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private WebClient webClient = null; + private final String baseUrl; + private static final AtomicInteger sequenceNumber = new AtomicInteger(); + private final SslContext sslContext; + private final HttpProxyConfig httpProxyConfig; + + public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) { + this.baseUrl = baseUrl; + this.sslContext = sslContext; + this.httpProxyConfig = httpProxyConfig; + } + + public Mono> postForEntity(String uri, @Nullable String body) { + Object traceTag = createTraceTag(); + logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} POST body: {}", traceTag, body); + Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(traceTag, request); + }); + } + + public Mono post(String uri, @Nullable String body) { + return postForEntity(uri, body) // + .flatMap(this::toBody); + } + + public Mono postWithAuthHeader(String uri, String body, String username, String password) { + Object traceTag = createTraceTag(); + logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} POST body: {}", traceTag, body); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.post() // + .uri(uri) // + .headers(headers -> headers.setBasicAuth(username, password)) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request) // + .flatMap(this::toBody); + }); + } + + public Mono> putForEntity(String uri, String body) { + Object traceTag = createTraceTag(); + logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} PUT body: {}", traceTag, body); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.put() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request); + }); + } + + public Mono> putForEntity(String uri) { + Object traceTag = createTraceTag(); + logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} PUT body: ", traceTag); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.put() // + .uri(uri); + return retrieve(traceTag, request); + }); + } + + public Mono put(String uri, String body) { + return putForEntity(uri, body) // + .flatMap(this::toBody); + } + + public Mono> getForEntity(String uri) { + Object traceTag = createTraceTag(); + logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.get().uri(uri); + return retrieve(traceTag, request); + }); + } + + public Mono get(String uri) { + return getForEntity(uri) // + .flatMap(this::toBody); + } + + public Mono> deleteForEntity(String uri) { + Object traceTag = createTraceTag(); + logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.delete().uri(uri); + return retrieve(traceTag, request); + }); + } + + public Mono delete(String uri) { + return deleteForEntity(uri) // + .flatMap(this::toBody); + } + + private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { + final Class clazz = String.class; + return request.retrieve() // + .toEntity(clazz) // + .doOnNext(entity -> logReceivedData(traceTag, entity)) // + .doOnError(throwable -> onHttpError(traceTag, throwable)); + } + + private void logReceivedData(Object traceTag, ResponseEntity entity) { + logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType()); + } + + private static Object createTraceTag() { + return sequenceNumber.incrementAndGet(); + } + + private void onHttpError(Object traceTag, Throwable t) { + if (t instanceof WebClientResponseException) { + WebClientResponseException exception = (WebClientResponseException) t; + logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), + exception.getResponseBodyAsString()); + } else { + logger.debug("{} HTTP error {}", traceTag, t.getMessage()); + } + } + + private Mono toBody(ResponseEntity entity) { + if (entity.getBody() == null) { + return Mono.just(""); + } else { + return Mono.just(entity.getBody()); + } + } + + private boolean isHttpProxyConfigured() { + return httpProxyConfig != null && httpProxyConfig.httpProxyPort() > 0 + && !httpProxyConfig.httpProxyHost().isEmpty(); + } + + private HttpClient buildHttpClient() { + HttpClient httpClient = HttpClient.create() // + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // + .doOnConnected(connection -> { + connection.addHandlerLast(new ReadTimeoutHandler(30)); + connection.addHandlerLast(new WriteTimeoutHandler(30)); + }); + + if (this.sslContext != null) { + httpClient = httpClient.secure(ssl -> ssl.sslContext(sslContext)); + } + + if (isHttpProxyConfigured()) { + httpClient = httpClient.proxy(proxy -> proxy.type(ProxyProvider.Proxy.HTTP) + .host(httpProxyConfig.httpProxyHost()).port(httpProxyConfig.httpProxyPort())); + } + return httpClient; + } + + private WebClient buildWebClient(String baseUrl) { + final HttpClient httpClient = buildHttpClient(); + ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() // + .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) // + .build(); + return WebClient.builder() // + .clientConnector(new ReactorClientHttpConnector(httpClient)) // + .baseUrl(baseUrl) // + .exchangeStrategies(exchangeStrategies) // + .build(); + } + + private Mono getWebClient() { + if (this.webClient == null) { + this.webClient = buildWebClient(baseUrl); + } + return Mono.just(buildWebClient(baseUrl)); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java new file mode 100644 index 00000000..18e59003 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java @@ -0,0 +1,193 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.clients; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import javax.net.ssl.KeyManagerFactory; + +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.ResourceUtils; + +/** + * Factory for a generic reactive REST client. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class AsyncRestClientFactory { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final SslContextFactory sslContextFactory; + private final HttpProxyConfig httpProxyConfig; + + public AsyncRestClientFactory(WebClientConfig clientConfig) { + if (clientConfig != null) { + this.sslContextFactory = new CachingSslContextFactory(clientConfig); + this.httpProxyConfig = clientConfig.httpProxyConfig(); + } else { + logger.warn("No configuration for web client defined, HTTPS will not work"); + this.sslContextFactory = null; + this.httpProxyConfig = null; + } + } + + public AsyncRestClient createRestClientNoHttpProxy(String baseUrl) { + return createRestClient(baseUrl, false); + } + + public AsyncRestClient createRestClientUseHttpProxy(String baseUrl) { + return createRestClient(baseUrl, true); + } + + private AsyncRestClient createRestClient(String baseUrl, boolean useHttpProxy) { + if (this.sslContextFactory != null) { + try { + return new AsyncRestClient(baseUrl, this.sslContextFactory.createSslContext(), + useHttpProxy ? httpProxyConfig : null); + } catch (Exception e) { + String exceptionString = e.toString(); + logger.error("Could not init SSL context, reason: {}", exceptionString); + } + } + return new AsyncRestClient(baseUrl, null, httpProxyConfig); + } + + private class SslContextFactory { + private final WebClientConfig clientConfig; + + public SslContextFactory(WebClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, IOException { + return this.createSslContext(createKeyManager()); + } + + private SslContext createSslContext(KeyManagerFactory keyManager) + throws NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException { + if (this.clientConfig.isTrustStoreUsed()) { + return createSslContextRejectingUntrustedPeers(this.clientConfig.trustStore(), + this.clientConfig.trustStorePassword(), keyManager); + } else { + // Trust anyone + return SslContextBuilder.forClient() // + .keyManager(keyManager) // + .trustManager(InsecureTrustManagerFactory.INSTANCE) // + .build(); + } + } + + private SslContext createSslContextRejectingUntrustedPeers(String trustStorePath, String trustStorePass, + KeyManagerFactory keyManager) + throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + + final KeyStore trustStore = getTrustStore(trustStorePath, trustStorePass); + List certificateList = Collections.list(trustStore.aliases()).stream() // + .filter(alias -> isCertificateEntry(trustStore, alias)) // + .map(alias -> getCertificate(trustStore, alias)) // + .collect(Collectors.toList()); + final X509Certificate[] certificates = certificateList.toArray(new X509Certificate[certificateList.size()]); + + return SslContextBuilder.forClient() // + .keyManager(keyManager) // + .trustManager(certificates) // + .build(); + } + + private boolean isCertificateEntry(KeyStore trustStore, String alias) { + try { + return trustStore.isCertificateEntry(alias); + } catch (KeyStoreException e) { + logger.error("Error reading truststore {}", e.getMessage()); + return false; + } + } + + private Certificate getCertificate(KeyStore trustStore, String alias) { + try { + return trustStore.getCertificate(alias); + } catch (KeyStoreException e) { + logger.error("Error reading truststore {}", e.getMessage()); + return null; + } + } + + private KeyManagerFactory createKeyManager() throws NoSuchAlgorithmException, CertificateException, IOException, + UnrecoverableKeyException, KeyStoreException { + final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final KeyStore keyStore = KeyStore.getInstance(this.clientConfig.keyStoreType()); + final String keyStoreFile = this.clientConfig.keyStore(); + final String keyStorePassword = this.clientConfig.keyStorePassword(); + final String keyPassword = this.clientConfig.keyPassword(); + try (final InputStream inputStream = new FileInputStream(keyStoreFile)) { + keyStore.load(inputStream, keyStorePassword.toCharArray()); + } + keyManager.init(keyStore, keyPassword.toCharArray()); + return keyManager; + } + + private synchronized KeyStore getTrustStore(String trustStorePath, String trustStorePass) + throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + + KeyStore store = KeyStore.getInstance(KeyStore.getDefaultType()); + store.load(new FileInputStream(ResourceUtils.getFile(trustStorePath)), trustStorePass.toCharArray()); + return store; + } + } + + public class CachingSslContextFactory extends SslContextFactory { + private SslContext cachedContext = null; + + public CachingSslContextFactory(WebClientConfig clientConfig) { + super(clientConfig); + } + + @Override + public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, IOException { + if (this.cachedContext == null) { + this.cachedContext = super.createSslContext(); + } + return this.cachedContext; + + } + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java new file mode 100644 index 00000000..e26fd46f --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -0,0 +1,133 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.configuration; + +import java.lang.invoke.MethodHandles; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; + +import lombok.Getter; + +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +@EnableConfigurationProperties +public class ApplicationConfig { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Getter + @Value("${app.configuration-filepath}") + private String localConfigurationFilePath; + + @Value("${server.ssl.key-store-type}") + private String sslKeyStoreType = ""; + + @Value("${server.ssl.key-store-password}") + private String sslKeyStorePassword = ""; + + @Value("${server.ssl.key-store}") + private String sslKeyStore = ""; + + @Value("${server.ssl.key-password}") + private String sslKeyPassword = ""; + + @Value("${app.webclient.trust-store-used}") + private boolean sslTrustStoreUsed = false; + + @Value("${app.webclient.trust-store-password}") + private String sslTrustStorePassword = ""; + + @Value("${app.webclient.trust-store}") + private String sslTrustStore = ""; + + @Value("${app.webclient.http.proxy-host:\"\"}") + private String httpProxyHost = ""; + + @Value("${app.webclient.http.proxy-port:0}") + private int httpProxyPort = 0; + + @Getter + @Value("${server.port}") + private int localServerHttpPort; + + @Getter + @Value("${app.ecs-base-url}") + private String ecsBaseUrl; + + @Getter + @Value("${app.dmaap-adapter-base-url}") + private String selfUrl; + + @Getter + @Value("${app.dmaap-base-url}") + private String dmaapBaseUrl; + + private WebClientConfig webClientConfig = null; + + public WebClientConfig getWebClientConfig() { + if (this.webClientConfig == null) { + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost(this.httpProxyHost) // + .httpProxyPort(this.httpProxyPort) // + .build(); + + this.webClientConfig = ImmutableWebClientConfig.builder() // + .keyStoreType(this.sslKeyStoreType) // + .keyStorePassword(this.sslKeyStorePassword) // + .keyStore(this.sslKeyStore) // + .keyPassword(this.sslKeyPassword) // + .isTrustStoreUsed(this.sslTrustStoreUsed) // + .trustStore(this.sslTrustStore) // + .trustStorePassword(this.sslTrustStorePassword) // + .httpProxyConfig(httpProxyConfig) // + .build(); + } + return this.webClientConfig; + } + + // Adapter to parse the json format of the configuration file. + static class ConfigFile { + Collection types; + } + + public Collection getTypes() { + com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + + try { + String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset()); + ConfigFile configData = gson.fromJson(configJson, ConfigFile.class); + return configData.types; + } catch (Exception e) { + logger.error("Could not load configuration file {}", getLocalConfigurationFilePath()); + return Collections.emptyList(); + } + + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/WebClientConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/WebClientConfig.java new file mode 100644 index 00000000..e65fdb99 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/WebClientConfig.java @@ -0,0 +1,54 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.configuration; + +import org.immutables.value.Value; + +@Value.Immutable +@Value.Style(redactedMask = "####") +public interface WebClientConfig { + public String keyStoreType(); + + @Value.Redacted + public String keyStorePassword(); + + public String keyStore(); + + @Value.Redacted + public String keyPassword(); + + public boolean isTrustStoreUsed(); + + @Value.Redacted + public String trustStorePassword(); + + public String trustStore(); + + @Value.Immutable + public interface HttpProxyConfig { + public String httpProxyHost(); + + public int httpProxyPort(); + } + + public HttpProxyConfig httpProxyConfig(); + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java new file mode 100644 index 00000000..39f62fb3 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java @@ -0,0 +1,112 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.controllers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; + +public class ErrorResponse { + private static Gson gson = new GsonBuilder() // + .create(); // + + // Returned as body for all failed REST calls + @Schema(name = "error_information", description = "Problem as defined in https://tools.ietf.org/html/rfc7807") + public static class ErrorInfo { + @SerializedName("type") + private String type = "about:blank"; + + @SerializedName("title") + private String title = null; + + @SerializedName("status") + private final Integer status; + + @SerializedName("detail") + private String detail = null; + + @SerializedName("instance") + private String instance = null; + + public ErrorInfo(String detail, Integer status) { + this.detail = detail; + this.status = status; + } + + @Schema(example = "503", + description = "The HTTP status code generated by the origin server for this occurrence of the problem. ") + public Integer getStatus() { + return status; + } + + @Schema(example = "Policy type not found", + description = " A human-readable explanation specific to this occurrence of the problem.") + public String getDetail() { + return this.detail; + } + + } + + @Schema(name = "message", description = "message") + public final String message; + + ErrorResponse(String message) { + this.message = message; + } + + static Mono> createMono(String text, HttpStatus code) { + return Mono.just(create(text, code)); + } + + static Mono> createMono(Exception e, HttpStatus code) { + return createMono(e.toString(), code); + } + + public static ResponseEntity create(String text, HttpStatus code) { + ErrorInfo p = new ErrorInfo(text, code.value()); + String json = gson.toJson(p); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_PROBLEM_JSON); + return new ResponseEntity<>(json, headers, code); + } + + public static ResponseEntity create(Throwable e, HttpStatus code) { + if (e instanceof RuntimeException) { + code = HttpStatus.INTERNAL_SERVER_ERROR; + } else if (e instanceof ServiceException) { + ServiceException se = (ServiceException) e; + if (se.getHttpStatus() != null) { + code = se.getHttpStatus(); + } + } + return create(e.toString(), code); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java new file mode 100644 index 00000000..4d99c58d --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -0,0 +1,119 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.controllers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; + +import org.oran.dmaapadapter.r1.ProducerJobInfo; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController("ConfigurationControllerV2") +@Tag(name = ProducerCallbacksController.API_NAME) +public class ProducerCallbacksController { + private static final Logger logger = LoggerFactory.getLogger(ProducerCallbacksController.class); + + public static final String API_NAME = "Management of configuration"; + public static final String API_DESCRIPTION = ""; + public static final String JOB_URL = "/dmaap_dataproducer/info_job"; + public static final String SUPERVISION_URL = "/dmaap_dataproducer/health_check"; + private static Gson gson = new GsonBuilder().create(); + private final Jobs jobs; + private final InfoTypes types; + + public ProducerCallbacksController(@Autowired Jobs jobs, @Autowired InfoTypes types) { + this.jobs = jobs; + this.types = types; + } + + @PostMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Callback for Information Job creation/modification", + description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity jobCreatedCallback( // + @RequestBody String body) { + try { + ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class); + + logger.info("Job started callback {}", request.id); + Job job = new Job(request.id, request.targetUri, types.getType(request.typeId)); + this.jobs.put(job); + return new ResponseEntity<>(HttpStatus.OK); + } catch (Exception e) { + return ErrorResponse.create(e, HttpStatus.NOT_FOUND); + } + } + + @DeleteMapping(path = JOB_URL + "/{infoJobId}", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Callback for Information Job deletion", + description = "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity jobDeletedCallback( // + @PathVariable("infoJobId") String infoJobId) { + try { + logger.info("Job deleted callback {}", infoJobId); + this.jobs.remove(infoJobId); + return new ResponseEntity<>(HttpStatus.OK); + } catch (Exception e) { + return ErrorResponse.create(e, HttpStatus.NOT_FOUND); + } + } + + @GetMapping(path = SUPERVISION_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Producer supervision", + description = "The endpoint is provided by the Information Producer and is used for supervision of the producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "The producer is OK", // + content = @Content(schema = @Schema(implementation = String.class))) // + }) + public ResponseEntity producerSupervision() { + logger.info("Producer supervision"); + return new ResponseEntity<>(HttpStatus.OK); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/VoidResponse.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/VoidResponse.java new file mode 100644 index 00000000..b7bba5f4 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/VoidResponse.java @@ -0,0 +1,31 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.controllers; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "void", description = "Void/empty") +public class VoidResponse { + private VoidResponse() {} +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java new file mode 100644 index 00000000..740911d4 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java @@ -0,0 +1,49 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.exceptions; + +import lombok.Getter; + +import org.springframework.http.HttpStatus; + +public class ServiceException extends Exception { + + private static final long serialVersionUID = 1L; + + @Getter + private final HttpStatus httpStatus; + + public ServiceException(String message) { + super(message); + httpStatus = null; + } + + public ServiceException(String message, Exception originalException) { + super(message, originalException); + httpStatus = null; + } + + public ServiceException(String message, HttpStatus httpStatus) { + super(message); + this.httpStatus = httpStatus; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java new file mode 100644 index 00000000..ce4a3b78 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java @@ -0,0 +1,71 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "consumer_job", description = "Information for an Enrichment Information Job") +public class ConsumerJobInfo { + + @Schema(name = "info_type_id", description = "Information type Idenitifier of the subscription job", + required = true) + @SerializedName("info_type_id") + @JsonProperty(value = "info_type_id", required = true) + public String infoTypeId = ""; + + @Schema(name = "job_owner", description = "Identity of the owner of the job", required = true) + @SerializedName("job_owner") + @JsonProperty(value = "job_owner", required = true) + public String owner = ""; + + @Schema(name = "job_definition", description = "Information type specific job data", required = true) + @SerializedName("job_definition") + @JsonProperty(value = "job_definition", required = true) + public Object jobDefinition; + + @Schema(name = "job_result_uri", description = "The target URI of the subscribed information", required = true) + @SerializedName("job_result_uri") + @JsonProperty(value = "job_result_uri", required = true) + public String jobResultUri = ""; + + @Schema(name = "status_notification_uri", + description = "The target of Information subscription job status notifications", required = false) + @SerializedName("status_notification_uri") + @JsonProperty(value = "status_notification_uri", required = false) + public String statusNotificationUri = ""; + + public ConsumerJobInfo() {} + + public ConsumerJobInfo(String infoTypeId, Object jobData, String owner, String targetUri, + String statusNotificationUri) { + this.infoTypeId = infoTypeId; + this.jobDefinition = jobData; + this.owner = owner; + this.jobResultUri = targetUri; + this.statusNotificationUri = statusNotificationUri; + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerInfoTypeInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerInfoTypeInfo.java new file mode 100644 index 00000000..1bf5e474 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerInfoTypeInfo.java @@ -0,0 +1,52 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "producer_info_type_info", description = "Information for an Information Type") +public class ProducerInfoTypeInfo { + + @Schema(name = "info_job_data_schema", description = "Json schema for the job data", required = true) + @SerializedName("info_job_data_schema") + @JsonProperty(value = "info_job_data_schema", required = true) + public Object jobDataSchema; + + @Schema(name = "info_type_information", description = "Type specific information for the information type", + required = true) + @SerializedName("info_type_information") + @JsonProperty(value = "info_type_information", required = true) + public Object typeSpecificInformation; + + public ProducerInfoTypeInfo(Object jobDataSchema, Object typeSpecificInformation) { + this.jobDataSchema = jobDataSchema; + this.typeSpecificInformation = typeSpecificInformation; + } + + public ProducerInfoTypeInfo() {} + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerJobInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerJobInfo.java new file mode 100644 index 00000000..d378825a --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerJobInfo.java @@ -0,0 +1,77 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "producer_info_job_request", + description = "The body of the Information Producer callbacks for Information Job creation and deletion") +public class ProducerJobInfo { + + @Schema(name = "info_job_identity", description = "Identity of the Information Job", required = true) + @SerializedName("info_job_identity") + @JsonProperty("info_job_identity") + public String id = ""; + + @Schema(name = "info_type_identity", description = "Type identity for the job") + @SerializedName("info_type_identity") + @JsonProperty("info_type_identity") + public String typeId = ""; + + @Schema(name = "info_job_data", description = "Json for the job data") + @SerializedName("info_job_data") + @JsonProperty("info_job_data") + public Object jobData; + + @Schema(name = "target_uri", description = "URI for the target of the produced Information") + @SerializedName("target_uri") + @JsonProperty("target_uri") + public String targetUri = ""; + + @Schema(name = "owner", description = "The owner of the job") + @SerializedName("owner") + @JsonProperty("owner") + public String owner = ""; + + @Schema(name = "last_updated", description = "The time when the job was last updated or created (ISO-8601)") + @SerializedName("last_updated") + @JsonProperty("last_updated") + public String lastUpdated = ""; + + public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner, + String lastUpdated) { + this.id = id; + this.jobData = jobData; + this.typeId = typeId; + this.targetUri = targetUri; + this.owner = owner; + this.lastUpdated = lastUpdated; + } + + public ProducerJobInfo() {} + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerRegistrationInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerRegistrationInfo.java new file mode 100644 index 00000000..e54c1522 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerRegistrationInfo.java @@ -0,0 +1,64 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import java.util.Collection; + +import lombok.Builder; + +import org.immutables.gson.Gson; + +@Builder +@Gson.TypeAdapters +@Schema(name = "producer_registration_info", description = "Information for an Information Producer") +public class ProducerRegistrationInfo { + + @Schema(name = "supported_info_types", description = "Supported Information Type IDs", required = true) + @SerializedName("supported_info_types") + @JsonProperty(value = "supported_info_types", required = true) + public Collection supportedTypeIds; + + @Schema(name = "info_job_callback_url", description = "callback for Information Job", required = true) + @SerializedName("info_job_callback_url") + @JsonProperty(value = "info_job_callback_url", required = true) + public String jobCallbackUrl; + + @Schema(name = "info_producer_supervision_callback_url", description = "callback for producer supervision", + required = true) + @SerializedName("info_producer_supervision_callback_url") + @JsonProperty(value = "info_producer_supervision_callback_url", required = true) + public String producerSupervisionCallbackUrl; + + public ProducerRegistrationInfo(Collection types, String jobCallbackUrl, + String producerSupervisionCallbackUrl) { + this.supportedTypeIds = types; + this.jobCallbackUrl = jobCallbackUrl; + this.producerSupervisionCallbackUrl = producerSupervisionCallbackUrl; + } + + public ProducerRegistrationInfo() {} + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java new file mode 100644 index 00000000..d19577db --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -0,0 +1,38 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import lombok.Getter; + +public class InfoType { + + @Getter + private final String id; + + @Getter + private final String dmaapTopicUrl; + + public InfoType(String id, String dmaapTopicUrl) { + this.id = id; + this.dmaapTopicUrl = dmaapTopicUrl; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java new file mode 100644 index 00000000..b8677a37 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java @@ -0,0 +1,80 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfoTypes { + private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class); + + private Map allTypes = new HashMap<>(); + + public InfoTypes(Collection types) { + + for (InfoType type : types) { + put(type); + } + } + + public synchronized InfoType get(String id) { + return allTypes.get(id); + } + + public synchronized InfoType getType(String id) throws ServiceException { + InfoType type = allTypes.get(id); + if (type == null) { + throw new ServiceException("Could not find type: " + id); + } + return type; + } + + public static class ConfigFile { + Collection types; + } + + private synchronized void put(InfoType type) { + logger.debug("Put type: {}", type.getId()); + allTypes.put(type.getId(), type); + } + + public synchronized Iterable getAll() { + return new Vector<>(allTypes.values()); + } + + public synchronized Collection typeIds() { + return allTypes.keySet(); + } + + public synchronized int size() { + return allTypes.size(); + } + + public synchronized void clear() { + allTypes.clear(); + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java new file mode 100644 index 00000000..690e465b --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -0,0 +1,42 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import lombok.Getter; + +public class Job { + + @Getter + private final String id; + + @Getter + private final String callbackUrl; + + @Getter + private final InfoType type; + + public Job(String id, String callbackUrl, InfoType type) { + this.id = id; + this.callbackUrl = callbackUrl; + this.type = type; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java new file mode 100644 index 00000000..6e2b3265 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -0,0 +1,89 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class Jobs { + private static final Logger logger = LoggerFactory.getLogger(Jobs.class); + + private Map allJobs = new HashMap<>(); + private MultiMap jobsByType = new MultiMap<>(); + + public Jobs() {} + + public synchronized Job getJob(String id) throws ServiceException { + Job job = allJobs.get(id); + if (job == null) { + throw new ServiceException("Could not find job: " + id); + } + return job; + } + + public synchronized Job get(String id) { + return allJobs.get(id); + } + + public synchronized void put(Job job) { + logger.debug("Put service: {}", job.getId()); + allJobs.put(job.getId(), job); + jobsByType.put(job.getType().getId(), job.getId(), job); + } + + public synchronized Iterable getAll() { + return new Vector<>(allJobs.values()); + } + + public synchronized Job remove(String id) { + Job job = allJobs.get(id); + if (job != null) { + remove(job); + } + return job; + } + + public synchronized void remove(Job job) { + this.allJobs.remove(job.getId()); + jobsByType.remove(job.getType().getId(), job.getId()); + } + + public synchronized int size() { + return allJobs.size(); + } + + public synchronized Collection getJobsForType(InfoType type) { + return jobsByType.get(type.getId()); + } + + public synchronized void clear() { + allJobs.clear(); + jobsByType.clear(); + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java new file mode 100644 index 00000000..38f3d175 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java @@ -0,0 +1,65 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019-2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +/** + * A map, where each key can be bound to may values (where each value has an own + * ID) + */ +public class MultiMap { + + private final Map> map = new HashMap<>(); + + public void put(String key, String id, T value) { + this.map.computeIfAbsent(key, k -> new HashMap<>()).put(id, value); + } + + public T remove(String key, String id) { + Map innerMap = this.map.get(key); + if (innerMap != null) { + T removedElement = innerMap.remove(id); + if (innerMap.isEmpty()) { + this.map.remove(key); + } + return removedElement; + } + return null; + } + + public Collection get(String key) { + Map innerMap = this.map.get(key); + if (innerMap == null) { + return Collections.emptyList(); + } + return new Vector<>(innerMap.values()); + } + + public void clear() { + this.map.clear(); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java new file mode 100644 index 00000000..fb5c891c --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -0,0 +1,135 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.time.Duration; + +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.Jobs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +/** + * The class fetches incoming requests from DMAAP and sends them further to the + * consumers that has a job for this InformationType. + */ + +public class DmaapMessageConsumer { + private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); + private final ApplicationConfig applicationConfig; + private final AsyncRestClient restClient; + private final InfoType type; + private final Jobs jobs; + private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); + + /** Submits new elements until stopped */ + private static class InfiniteFlux { + private FluxSink sink; + private int counter = 0; + + public synchronized Flux start() { + stop(); + return Flux.create(this::next).doOnRequest(this::onRequest); + } + + public synchronized void stop() { + if (this.sink != null) { + this.sink.complete(); + this.sink = null; + } + } + + void onRequest(long no) { + logger.debug("InfiniteFlux.onRequest {}", no); + for (long i = 0; i < no; ++i) { + sink.next(counter++); + } + } + + void next(FluxSink sink) { + logger.debug("InfiniteFlux.next"); + this.sink = sink; + sink.next(counter++); + } + } + + public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { + this.applicationConfig = applicationConfig; + AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + this.restClient = restclientFactory.createRestClientNoHttpProxy(""); + this.type = type; + this.jobs = jobs; + } + + public void start() { + infiniteSubmitter.stop(); + + createTask().subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {}", value), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("DmaapMessageConsumer stopped") // + ); + } + + protected Flux createTask() { + final int CONCURRENCY = 5; + return infiniteSubmitter.start() // + .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // + .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // + .flatMap(this::handleReceivedMessage, CONCURRENCY); + } + + private String getDmaapUrl() { + return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl(); + } + + private Mono handleErrorResponse(Throwable t) { + logger.debug("error from DMAAP {}", t.getMessage()); + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // + .flatMap(notUsed -> Mono.empty()); + } + + protected Mono getFromMessageRouter(String topicUrl) { + logger.trace("getFromMessageRouter {}", topicUrl); + return restClient.get(topicUrl) // + .onErrorResume(this::handleErrorResponse); + } + + protected Flux handleReceivedMessage(String body) { + logger.debug("Received from DMAAP {}", body); + final int CONCURRENCY = 5; + + // Distibute the body to all jobs for this type + return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // + .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) + .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .onErrorResume(this::handleErrorResponse); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java new file mode 100644 index 00000000..b9a50b39 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -0,0 +1,170 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import com.google.gson.JsonParser; + +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; +import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Registers the types and this producer in ECS. This is done when needed. + */ +@Component +@EnableScheduling +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class ProducerRegstrationTask { + + private static final Logger logger = LoggerFactory.getLogger(ProducerRegstrationTask.class); + private final AsyncRestClient restClient; + private final ApplicationConfig applicationConfig; + private final InfoTypes types; + private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + + private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; + private boolean isRegisteredInEcs = false; + private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; + + public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + this.restClient = restClientFactory.createRestClientNoHttpProxy(""); + this.applicationConfig = applicationConfig; + this.types = types; + } + + @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) + public void supervisionTask() { + logger.debug("Checking producers starting"); + createTask().subscribe(null, null, () -> logger.debug("Producer registration completed")); + } + + public Mono createTask() { + return checkProducerRegistration() // + .doOnError(t -> isRegisteredInEcs = false) // + .onErrorResume(t -> registerTypesAndProducer()); + } + + public boolean isRegisteredInEcs() { + return this.isRegisteredInEcs; + } + + private Mono checkProducerRegistration() { + final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + return restClient.get(url) // + .flatMap(this::checkRegistrationInfo) // + ; + } + + private String registerTypeUrl(InfoType type) { + String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); + return url; + } + + private Mono registerTypesAndProducer() { + final String producerUrl = + applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + + return Flux.fromIterable(this.types.getAll()) // + .doOnNext(type -> logger.info("Registering type {}", type.getId())) // + .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) // + .collectList() // + .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) // + .onErrorResume(t -> { + logger.warn("Registration failed {}", t.getMessage()); + isRegisteredInEcs = false; + return Mono.empty(); + }) // + .doOnNext(x -> logger.debug("Registering types and producer completed")); + } + + private Object typeSpecifcInfoObject() { + return jsonObject("{}"); + } + + private ProducerInfoTypeInfo typeRegistrationInfo() { + return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject()); + } + + private Object jsonSchemaObject() { + // An object with no properties + String schemaStr = "{" // + + "\"type\": \"object\"," // + + "\"properties\": {}," // + + "\"additionalProperties\": false" // + + "}"; // + return jsonObject(schemaStr); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + logger.error("Bug, error in JSON: {}", json); + throw new NullPointerException(e.toString()); + } + } + + private Mono checkRegistrationInfo(String resp) { + ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class); + if (isEqual(producerRegistrationInfo(), info)) { + logger.debug("Already registered"); + this.isRegisteredInEcs = true; + return Mono.empty(); + } else { + return Mono.error(new ServiceException("Producer registration will be started")); + } + } + + private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) { + return a.jobCallbackUrl.equals(b.jobCallbackUrl) // + && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) // + && a.supportedTypeIds.size() == b.supportedTypeIds.size(); + } + + private ProducerRegistrationInfo producerRegistrationInfo() { + + return ProducerRegistrationInfo.builder() // + .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) // + .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) // + .supportedTypeIds(types.typeIds()) // + .build(); + } + + private String baseUrl() { + return this.applicationConfig.getSelfUrl(); + } +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java new file mode 100644 index 00000000..ff50761f --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -0,0 +1,238 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; +import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) +@TestPropertySource(properties = { // + "server.ssl.key-store=./config/keystore.jks", // + "app.webclient.trust-store=./config/truststore.jks", // + "app.vardata-directory=./target", // + "app.configuration-filepath=./src/test/resources/test_application_configuration.json"// +}) +class ApplicationTest { + private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); + + @Autowired + private ApplicationConfig applicationConfig; + + @Autowired + private ProducerRegstrationTask producerRegstrationTask; + + @Autowired + private Jobs jobs; + + @Autowired + private InfoTypes types; + + @Autowired + private ConsumerController consumerController; + + @Autowired + private EcsSimulatorController ecsSimulatorController; + + @LocalServerPort + int localServerHttpPort; + + private static Gson gson = new GsonBuilder().create(); + + static class TestApplicationConfig extends ApplicationConfig { + @Override + public String getEcsBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getDmaapBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getSelfUrl() { + return thisProcessUrl(); + } + + private String thisProcessUrl() { + final String url = "https://localhost:" + getLocalServerHttpPort(); + return url; + } + } + + /** + * Overrides the BeanFactory. + */ + @TestConfiguration + static class TestBeanFactory extends BeanFactory { + + @Override + @Bean + public ServletWebServerFactory servletContainer() { + return new TomcatServletWebServerFactory(); + } + + @Override + @Bean + public ApplicationConfig getApplicationConfig() { + TestApplicationConfig cfg = new TestApplicationConfig(); + return cfg; + } + } + + @AfterEach + void reset() { + this.consumerController.testResults.reset(); + this.ecsSimulatorController.testResults.reset(); + this.jobs.clear(); + this.types.clear(); + } + + private AsyncRestClient restClient(boolean useTrustValidation) { + WebClientConfig config = this.applicationConfig.getWebClientConfig(); + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost("") // + .httpProxyPort(0) // + .build(); + config = ImmutableWebClientConfig.builder() // + .keyStoreType(config.keyStoreType()) // + .keyStorePassword(config.keyStorePassword()) // + .keyStore(config.keyStore()) // + .keyPassword(config.keyPassword()) // + .isTrustStoreUsed(useTrustValidation) // + .trustStore(config.trustStore()) // + .trustStorePassword(config.trustStorePassword()) // + .httpProxyConfig(httpProxyConfig).build(); + + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + return restClientFactory.createRestClientNoHttpProxy(baseUrl()); + } + + private AsyncRestClient restClient() { + return restClient(false); + } + + private String baseUrl() { + return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); + } + + private ConsumerJobInfo consumerJobInfo() { + InfoType type = this.types.getAll().iterator().next(); + return consumerJobInfo(type.getId(), "EI_JOB_ID"); + } + + private Object jsonObject() { + return jsonObject("{}"); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + throw new NullPointerException(e.toString()); + } + } + + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + try { + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + } catch (Exception e) { + return null; + } + } + + @Test + void generateApiDoc() throws IOException { + String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs"; + ResponseEntity resp = restClient().getForEntity(url).block(); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); + JSONObject jsonObj = new JSONObject(resp.getBody()); + assertThat(jsonObj.remove("servers")).isNotNull(); + + String indented = (jsonObj).toString(4); + String docDir = "api/"; + Files.createDirectories(Paths.get(docDir)); + try (PrintStream out = new PrintStream(new FileOutputStream(docDir + "api.json"))) { + out.print(indented); + } + } + + @Test + void testWholeChain() throws Exception { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + + this.ecsSimulatorController.addJob(consumerJobInfo(), restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); + DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + + } + +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java new file mode 100644 index 00000000..1dbe83f7 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -0,0 +1,78 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.oran.dmaapadapter.controllers.VoidResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController("ConsumerSimulatorController") +@Tag(name = "Consts.PRODUCER_API_CALLBACKS_NAME") +public class ConsumerController { + + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String CONSUMER_TARGET_URL = "/consumer"; + + public static class TestResults { + + public List receivedBodies = Collections.synchronizedList(new ArrayList()); + + public TestResults() {} + + public void reset() { + receivedBodies.clear(); + } + } + + final TestResults testResults = new TestResults(); + + @PostMapping(path = CONSUMER_TARGET_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "GET from topic", description = "The call is invoked to push data to consumer") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity postData(@RequestBody String body) { + logger.info("Received by consumer: {}", body); + testResults.receivedBodies.add(body); + return new ResponseEntity<>(HttpStatus.OK); + } + +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java new file mode 100644 index 00000000..aa6220f3 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -0,0 +1,72 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import org.oran.dmaapadapter.controllers.ErrorResponse; +import org.oran.dmaapadapter.controllers.VoidResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController("ProducerSimulatorController") +@Tag(name = "ProducerConsts.PRODUCER_API_CALLBACKS_NAME") +public class DmaapSimulatorController { + + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1"; + + public static List dmaapResponses = Collections.synchronizedList(new LinkedList()); + + @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "GET from topic", + description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity getFromTopic() { + if (dmaapResponses.isEmpty()) { + return ErrorResponse.create("", HttpStatus.NOT_FOUND); + } else { + String resp = dmaapResponses.remove(0); + return new ResponseEntity<>(resp, HttpStatus.OK); + } + + } + +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java new file mode 100644 index 00000000..7d309cf8 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java @@ -0,0 +1,108 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; + +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; +import org.oran.dmaapadapter.r1.ProducerJobInfo; +import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController("EcsSimulatorController") +@Tag(name = "EcsSimulator") +public class EcsSimulatorController { + + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final static Gson gson = new GsonBuilder().create(); + + public static class TestResults { + + ProducerRegistrationInfo registrationInfo; + Map types = new HashMap<>(); + + public TestResults() {} + + public void reset() { + registrationInfo = null; + types.clear(); + } + } + + final TestResults testResults = new TestResults(); + public static final String API_ROOT = "/data-producer/v1"; + + @GetMapping(path = API_ROOT + "/info-producers/{infoProducerId}", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity getInfoProducer( // + @PathVariable("infoProducerId") String infoProducerId) { + + if (testResults.registrationInfo != null) { + return new ResponseEntity<>(gson.toJson(testResults.registrationInfo), HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + } + + @PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", // + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity putInfoProducer( // + @PathVariable("infoProducerId") String infoProducerId, // + @RequestBody ProducerRegistrationInfo registrationInfo) { + testResults.registrationInfo = registrationInfo; + return new ResponseEntity<>(HttpStatus.OK); + } + + @PutMapping(path = API_ROOT + "/info-types/{infoTypeId}", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity putInfoType( // + @PathVariable("infoTypeId") String infoTypeId, // + @RequestBody ProducerInfoTypeInfo registrationInfo) { + testResults.types.put(infoTypeId, registrationInfo); + return new ResponseEntity<>(HttpStatus.OK); + } + + public void addJob(ConsumerJobInfo job, AsyncRestClient restClient) { + String url = this.testResults.registrationInfo.jobCallbackUrl; + ProducerJobInfo request = + new ProducerJobInfo(job.jobDefinition, "ID", job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP"); + String body = gson.toJson(request); + restClient.post(url, body).block(); + + } +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java new file mode 100644 index 00000000..54460043 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java @@ -0,0 +1,227 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; +import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) +@TestPropertySource(properties = { // + "server.ssl.key-store=./config/keystore.jks", // + "app.webclient.trust-store=./config/truststore.jks", // + "app.vardata-directory=./target", // + "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // + "app.ecs-base-url=https://localhost:8434" // +}) +class IntegrationWithEcs { + private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); + + @Autowired + private ApplicationConfig applicationConfig; + + @Autowired + private ProducerRegstrationTask producerRegstrationTask; + + @Autowired + private Jobs jobs; + + @Autowired + private InfoTypes types; + + @Autowired + private ConsumerController consumerController; + + private static Gson gson = new GsonBuilder().create(); + + static class TestApplicationConfig extends ApplicationConfig { + + @Override + public String getEcsBaseUrl() { + return "https://localhost:8434"; + } + + @Override + public String getDmaapBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getSelfUrl() { + return thisProcessUrl(); + } + + private String thisProcessUrl() { + final String url = "https://localhost:" + getLocalServerHttpPort(); + return url; + } + } + + /** + * Overrides the BeanFactory. + */ + @TestConfiguration + static class TestBeanFactory extends BeanFactory { + + @Override + @Bean + public ServletWebServerFactory servletContainer() { + return new TomcatServletWebServerFactory(); + } + + @Override + @Bean + public ApplicationConfig getApplicationConfig() { + TestApplicationConfig cfg = new TestApplicationConfig(); + return cfg; + } + } + + @AfterEach + void reset() { + this.consumerController.testResults.reset(); + this.jobs.clear(); + this.types.clear(); + } + + private AsyncRestClient restClient(boolean useTrustValidation) { + WebClientConfig config = this.applicationConfig.getWebClientConfig(); + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost("") // + .httpProxyPort(0) // + .build(); + config = ImmutableWebClientConfig.builder() // + .keyStoreType(config.keyStoreType()) // + .keyStorePassword(config.keyStorePassword()) // + .keyStore(config.keyStore()) // + .keyPassword(config.keyPassword()) // + .isTrustStoreUsed(useTrustValidation) // + .trustStore(config.trustStore()) // + .trustStorePassword(config.trustStorePassword()) // + .httpProxyConfig(httpProxyConfig).build(); + + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + return restClientFactory.createRestClientNoHttpProxy(selfBaseUrl()); + } + + private AsyncRestClient restClient() { + return restClient(false); + } + + private String selfBaseUrl() { + return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); + } + + private String ecsBaseUrl() { + return applicationConfig.getEcsBaseUrl(); + } + + private void createInformationJobInEcs() { + String url = ecsBaseUrl() + "/data-consumer/v1/info-jobs/jobId"; + String body = gson.toJson(consumerJobInfo()); + try { + // Delete the job if it already exists + restClient().delete(url).block(); + } catch (Exception e) { + } + restClient().putForEntity(url, body).block(); + } + + private ConsumerJobInfo consumerJobInfo() { + InfoType type = this.types.getAll().iterator().next(); + return consumerJobInfo(type.getId(), "EI_JOB_ID"); + } + + private Object jsonObject() { + return jsonObject("{}"); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + throw new NullPointerException(e.toString()); + } + } + + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + try { + String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; + return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + } catch (Exception e) { + return null; + } + } + + @Test + void testWholeChain() throws Exception { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + + createInformationJobInEcs(); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); + DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + + ConsumerController.TestResults results = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2)); + assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + + synchronized (this) { + // logger.warn("**************** Keeping server alive! " + + // this.applicationConfig.getLocalServerHttpPort()); + // this.wait(); + } + } + +} diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json new file mode 100644 index 00000000..0db20cb3 --- /dev/null +++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json @@ -0,0 +1,8 @@ +{ + "types": [ + { + "id": "ExampleInformationType", + "dmaapTopicUrl": "/dmaap-topic-1" + } + ] +} \ No newline at end of file diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java index e3089f92..85664f6b 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java @@ -126,7 +126,7 @@ public class InfoTypes { out.print(gson.toJson(type)); } } catch (Exception e) { - logger.warn("Could not save job: {} {}", type.getId(), e.getMessage()); + logger.warn("Could not save type: {} {}", type.getId(), e.getMessage()); } } diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java index 76063114..47df78a2 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java @@ -35,7 +35,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; @TestPropertySource( properties = { // "server.ssl.key-store=./config/keystore.jks", // - "app.webclient.trust-store=./config/truststore.jks"}) + "app.webclient.trust-store=./config/truststore.jks", "app.vardata-directory=./target"}) @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock. class MockEnrichmentService { private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); diff --git a/pom.xml b/pom.xml index ee840451..e0665a11 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ enrichment-coordinator-service r-app-catalogue helm-manager + dmaap-adaptor-java