From: John Keeney Date: Wed, 22 Sep 2021 12:33:16 +0000 (+0000) Subject: Merge "Test updates for ECS, PMS and SDNC" X-Git-Tag: 1.2.0~88 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=f0e49a07dad877f94f635dda4ab477b9636536c8;hp=83a750fdb5861f39bcb636c672e2eea4a6ec891b;p=nonrtric.git Merge "Test updates for ECS, PMS and SDNC" --- diff --git a/dmaap-adaptor-java/Dockerfile b/dmaap-adaptor-java/Dockerfile new file mode 100644 index 00000000..b2c0c30c --- /dev/null +++ b/dmaap-adaptor-java/Dockerfile @@ -0,0 +1,43 @@ +# +# ============LICENSE_START======================================================= +# O-RAN-SC +# ================================================================================ +# Copyright (C) 2021 Nordix Foundation. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END========================================================= + + +FROM openjdk:11-jre-slim + +EXPOSE 8084 8435 + +ARG JAR + +WORKDIR /opt/app/dmaap-adaptor-service +RUN mkdir -p /var/log/dmaap-adaptor-service +RUN mkdir -p /opt/app/dmaap-adaptor-service/etc/cert/ +RUN mkdir -p /var/dmaap-adaptor-service +RUN chmod -R 777 /var/dmaap-adaptor-service + +ADD /config/application.yaml /opt/app/dmaap-adaptor-service/config/application.yaml +ADD /config/application_configuration.json /opt/app/dmaap-adaptor-service/data/application_configuration.json_example +ADD /config/keystore.jks /opt/app/dmaap-adaptor-service/etc/cert/keystore.jks +ADD /config/truststore.jks /opt/app/dmaap-adaptor-service/etc/cert/truststore.jks + +RUN chmod -R 777 /opt/app/dmaap-adaptor-service/config/ + +ADD target/${JAR} /opt/app/dmaap-adaptor-service/dmaap-adaptor.jar +CMD ["java", "-jar", "/opt/app/dmaap-adaptor-service/dmaap-adaptor.jar"] diff --git a/dmaap-adaptor-java/config/README b/dmaap-adaptor-java/config/README new file mode 100644 index 00000000..140927f7 --- /dev/null +++ b/dmaap-adaptor-java/config/README @@ -0,0 +1,41 @@ +The keystore.jks and truststore.jks files are created by using the following commands (note that this is an example): + +1) Create a CA certificate and a private key: + +openssl genrsa -des3 -out CA-key.pem 2048 +openssl req -new -key CA-key.pem -x509 -days 1000 -out CA-cert.pem + +2) Create a keystore with a private key entry that is signed by the CA: + +keytool -genkeypair -alias policy_agent -keyalg RSA -keysize 2048 -keystore keystore.jks -validity 3650 -storepass policy_agent +keytool -certreq -alias policy_agent -file request.csr -keystore keystore.jks -ext san=dns:your.domain.com -storepass policy_agent +openssl x509 -req -days 365 -in request.csr -CA CA-cert.pem -CAkey CA-key.pem -CAcreateserial -out ca_signed-cert.pem +keytool -importcert -alias ca_cert -file CA-cert.pem -keystore keystore.jks -trustcacerts -storepass policy_agent +keytool -importcert -alias policy_agent -file ca_signed-cert.pem -keystore keystore.jks -trustcacerts -storepass policy_agent + + +3) Create a trust store containing the CA cert (to trust all certs signed by the CA): + +keytool -genkeypair -alias not_used -keyalg RSA -keysize 2048 -keystore truststore.jks -validity 3650 -storepass policy_agent +keytool -importcert -alias ca_cert -file CA-cert.pem -keystore truststore.jks -trustcacerts -storepass policy_agent + + +4) Command for listing of the contents of jks files, examples: +keytool -list -v -keystore keystore.jks -storepass policy_agent +keytool -list -v -keystore truststore.jks -storepass policy_agent + +## License + +Copyright (C) 2020 Nordix Foundation. All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml new file mode 100644 index 00000000..3990ceb2 --- /dev/null +++ b/dmaap-adaptor-java/config/application.yaml @@ -0,0 +1,56 @@ +spring: + profiles: + active: prod + main: + allow-bean-definition-overriding: true + aop: + auto: false +management: + endpoints: + web: + exposure: + # Enabling of springboot actuator features. See springboot documentation. + include: "loggers,logfile,health,info,metrics,threaddump,heapdump" + +logging: + # Configuration of logging + level: + ROOT: ERROR + org.springframework: ERROR + org.springframework.data: ERROR + org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR + org.oran.dmaapadapter: INFO + file: + name: /var/log/dmaap-adaptor-service/application.log +server: + # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework. + # See springboot documentation. + port : 8435 + http-port: 8084 + ssl: + key-store-type: JKS + key-store-password: policy_agent + key-store: /opt/app/dmaap-adaptor-service/etc/cert/keystore.jks + key-password: policy_agent + key-alias: policy_agent +app: + webclient: + # Configuration of the trust store used for the HTTP client (outgoing requests) + # The file location and the password for the truststore is only relevant if trust-store-used == true + # Note that the same keystore as for the server is used. + trust-store-used: false + trust-store-password: policy_agent + trust-store: /opt/app/dmaap-adaptor-service/etc/cert/truststore.jks + # Configuration of usage of HTTP Proxy for the southbound accesses. + # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s + http.proxy-host: + http.proxy-port: 0 + vardata-directory: /var/dmaap-adaptor-service + ecs-base-url: https://localhost:8434 + # Location of the component configuration file. The file will only be used if the Consul database is not used; + # configuration from the Consul will override the file. + configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json + dmaap-base-url: http://dradmin:dradmin@localhost:2222 + # The url used to adress this component. This is used as a callback url sent to other components. + dmaap-adapter-base-url: https://localhost:8435 + diff --git a/dmaap-adaptor-java/config/application_configuration.json b/dmaap-adaptor-java/config/application_configuration.json new file mode 100644 index 00000000..a8967d8b --- /dev/null +++ b/dmaap-adaptor-java/config/application_configuration.json @@ -0,0 +1,8 @@ +{ + "types": [ + { + "id": "ExampleInformationType", + "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + } + ] +} diff --git a/dmaap-adaptor-java/config/keystore.jks b/dmaap-adaptor-java/config/keystore.jks new file mode 100644 index 00000000..122997ac Binary files /dev/null and b/dmaap-adaptor-java/config/keystore.jks differ diff --git a/dmaap-adaptor-java/config/truststore.jks b/dmaap-adaptor-java/config/truststore.jks new file mode 100644 index 00000000..60d62889 Binary files /dev/null and b/dmaap-adaptor-java/config/truststore.jks differ diff --git a/dmaap-adaptor-java/eclipse-formatter.xml b/dmaap-adaptor-java/eclipse-formatter.xml new file mode 100644 index 00000000..b2e86eb5 --- /dev/null +++ b/dmaap-adaptor-java/eclipse-formatter.xml @@ -0,0 +1,362 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml new file mode 100644 index 00000000..01f51e27 --- /dev/null +++ b/dmaap-adaptor-java/pom.xml @@ -0,0 +1,383 @@ + + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.5.3 + + + org.o-ran-sc.nonrtric + dmaap-adaptor + 1.0.0-SNAPSHOT + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + onap-releases + onap-releases + https://nexus.onap.org/content/repositories/releases/ + + + + 11 + 3.0.0 + 2.8.2 + 1.1.6 + 2.1.6 + 20190722 + 3.6 + 3.8.0 + 2.12.2 + 1.24.3 + 3.0.11 + 0.30.0 + 1.1.11 + 2.1.1 + 3.7.0.1746 + 0.8.5 + true + + + + org.springdoc + springdoc-openapi-ui + 1.5.4 + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-thymeleaf + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework + spring-webflux + + + io.swagger.core.v3 + swagger-jaxrs2 + ${swagger.version} + + + io.swagger.core.v3 + swagger-jaxrs2-servlet-initializer + ${swagger.version} + + + javax.xml.bind + jaxb-api + + + org.immutables + value + ${immutable.version} + provided + + + org.immutables + gson + ${immutable.version} + + + org.json + json + ${json.version} + + + commons-net + commons-net + ${commons-net.version} + + + org.onap.dcaegen2.services.sdk.rest.services + cbs-client + ${sdk.version} + + + org.projectlombok + lombok + provided + + + javax.ws.rs + javax.ws.rs-api + ${javax.ws.rs-api.version} + + + + com.github.erosb + everit-json-schema + 1.12.1 + + + + org.springframework.boot + spring-boot-starter-actuator + + + + io.springfox + springfox-swagger2 + ${springfox.version} + + + io.springfox + springfox-swagger-ui + ${springfox.version} + + + + org.springframework.boot + spring-boot-devtools + true + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.awaitility + awaitility + test + + + io.projectreactor + reactor-test + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.mockito + mockito-junit-jupiter + test + + + org.mockito + mockito-core + test + + + com.squareup.okhttp3 + mockwebserver + test + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + net.revelc.code.formatter + formatter-maven-plugin + ${formatter-maven-plugin.version} + + ${project.basedir}/eclipse-formatter.xml + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless-maven-plugin.version} + + + + + com,java,javax,org + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/annotations/ + + + + + + + org.jacoco + jacoco-maven-plugin + ${jacoco-maven-plugin.version} + + + default-prepare-agent + + prepare-agent + + + + default-report + prepare-package + + report + + + + + + io.swagger.codegen.v3 + swagger-codegen-maven-plugin + ${swagger-codegen-maven-plugin.version} + + + test + + generate + + + ${project.basedir}/api/api.json + openapi-yaml + ${project.basedir}/api + + ecs-api.yaml + + + + + + + io.fabric8 + docker-maven-plugin + ${docker-maven-plugin} + false + + + generate-dmaap-adaptor-image + package + + build + + + ${env.CONTAINER_PULL_REGISTRY} + + + o-ran-sc/nonrtric-dmaap-adaptor:${project.version} + + try + ${basedir} + Dockerfile + + ${project.build.finalName}.jar + + + ${project.version} + + + + + + + + push-dmaap-adaptor-image + + build + push + + + ${env.CONTAINER_PULL_REGISTRY} + ${env.CONTAINER_PUSH_REGISTRY} + + + o-ran-sc/nonrtric-dmaap-adaptor:${project.version} + + ${basedir} + Dockerfile + + ${project.build.finalName}.jar + + + ${project.version} + latest + + + + + + + + + + + org.sonarsource.scanner.maven + sonar-maven-plugin + ${sonar-maven-plugin.version} + + + + + JIRA + https://jira.o-ran-sc.org/ + + \ No newline at end of file diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/Application.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/Application.java new file mode 100644 index 00000000..aa10972d --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/Application.java @@ -0,0 +1,33 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java new file mode 100644 index 00000000..c9ba93fc --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java @@ -0,0 +1,79 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import java.util.Collection; + +import org.apache.catalina.connector.Connector; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.DmaapMessageConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BeanFactory { + + @Value("${server.http-port}") + private int httpPort = 0; + + @Bean + public ApplicationConfig getApplicationConfig() { + return new ApplicationConfig(); + } + + @Bean + public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) { + Collection types = appConfig.getTypes(); + + // Start a consumer for each type + for (InfoType type : types) { + DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs); + topicConsumer.start(); + } + + return new InfoTypes(types); + } + + @Bean + public ServletWebServerFactory servletContainer() { + TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(); + if (httpPort > 0) { + tomcat.addAdditionalTomcatConnectors(getHttpConnector(httpPort)); + } + return tomcat; + } + + private static Connector getHttpConnector(int httpPort) { + Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); + connector.setScheme("http"); + connector.setPort(httpPort); + connector.setSecure(false); + return connector; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java new file mode 100644 index 00000000..ec1541cf --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -0,0 +1,239 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.clients; + +import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicInteger; + +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; +import org.springframework.web.reactive.function.client.WebClientResponseException; + +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.transport.ProxyProvider; + +/** + * Generic reactive REST client. + */ +public class AsyncRestClient { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private WebClient webClient = null; + private final String baseUrl; + private static final AtomicInteger sequenceNumber = new AtomicInteger(); + private final SslContext sslContext; + private final HttpProxyConfig httpProxyConfig; + + public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) { + this.baseUrl = baseUrl; + this.sslContext = sslContext; + this.httpProxyConfig = httpProxyConfig; + } + + public Mono> postForEntity(String uri, @Nullable String body) { + Object traceTag = createTraceTag(); + logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} POST body: {}", traceTag, body); + Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(traceTag, request); + }); + } + + public Mono post(String uri, @Nullable String body) { + return postForEntity(uri, body) // + .flatMap(this::toBody); + } + + public Mono postWithAuthHeader(String uri, String body, String username, String password) { + Object traceTag = createTraceTag(); + logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} POST body: {}", traceTag, body); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.post() // + .uri(uri) // + .headers(headers -> headers.setBasicAuth(username, password)) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request) // + .flatMap(this::toBody); + }); + } + + public Mono> putForEntity(String uri, String body) { + Object traceTag = createTraceTag(); + logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} PUT body: {}", traceTag, body); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.put() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request); + }); + } + + public Mono> putForEntity(String uri) { + Object traceTag = createTraceTag(); + logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); + logger.trace("{} PUT body: ", traceTag); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.put() // + .uri(uri); + return retrieve(traceTag, request); + }); + } + + public Mono put(String uri, String body) { + return putForEntity(uri, body) // + .flatMap(this::toBody); + } + + public Mono> getForEntity(String uri) { + Object traceTag = createTraceTag(); + logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.get().uri(uri); + return retrieve(traceTag, request); + }); + } + + public Mono get(String uri) { + return getForEntity(uri) // + .flatMap(this::toBody); + } + + public Mono> deleteForEntity(String uri) { + Object traceTag = createTraceTag(); + logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); + return getWebClient() // + .flatMap(client -> { + RequestHeadersSpec request = client.delete().uri(uri); + return retrieve(traceTag, request); + }); + } + + public Mono delete(String uri) { + return deleteForEntity(uri) // + .flatMap(this::toBody); + } + + private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { + final Class clazz = String.class; + return request.retrieve() // + .toEntity(clazz) // + .doOnNext(entity -> logReceivedData(traceTag, entity)) // + .doOnError(throwable -> onHttpError(traceTag, throwable)); + } + + private void logReceivedData(Object traceTag, ResponseEntity entity) { + logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType()); + } + + private static Object createTraceTag() { + return sequenceNumber.incrementAndGet(); + } + + private void onHttpError(Object traceTag, Throwable t) { + if (t instanceof WebClientResponseException) { + WebClientResponseException exception = (WebClientResponseException) t; + logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), + exception.getResponseBodyAsString()); + } else { + logger.debug("{} HTTP error {}", traceTag, t.getMessage()); + } + } + + private Mono toBody(ResponseEntity entity) { + if (entity.getBody() == null) { + return Mono.just(""); + } else { + return Mono.just(entity.getBody()); + } + } + + private boolean isHttpProxyConfigured() { + return httpProxyConfig != null && httpProxyConfig.httpProxyPort() > 0 + && !httpProxyConfig.httpProxyHost().isEmpty(); + } + + private HttpClient buildHttpClient() { + HttpClient httpClient = HttpClient.create() // + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // + .doOnConnected(connection -> { + connection.addHandlerLast(new ReadTimeoutHandler(30)); + connection.addHandlerLast(new WriteTimeoutHandler(30)); + }); + + if (this.sslContext != null) { + httpClient = httpClient.secure(ssl -> ssl.sslContext(sslContext)); + } + + if (isHttpProxyConfigured()) { + httpClient = httpClient.proxy(proxy -> proxy.type(ProxyProvider.Proxy.HTTP) + .host(httpProxyConfig.httpProxyHost()).port(httpProxyConfig.httpProxyPort())); + } + return httpClient; + } + + private WebClient buildWebClient(String baseUrl) { + final HttpClient httpClient = buildHttpClient(); + ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() // + .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) // + .build(); + return WebClient.builder() // + .clientConnector(new ReactorClientHttpConnector(httpClient)) // + .baseUrl(baseUrl) // + .exchangeStrategies(exchangeStrategies) // + .build(); + } + + private Mono getWebClient() { + if (this.webClient == null) { + this.webClient = buildWebClient(baseUrl); + } + return Mono.just(buildWebClient(baseUrl)); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java new file mode 100644 index 00000000..18e59003 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java @@ -0,0 +1,193 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.clients; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import javax.net.ssl.KeyManagerFactory; + +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.ResourceUtils; + +/** + * Factory for a generic reactive REST client. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class AsyncRestClientFactory { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final SslContextFactory sslContextFactory; + private final HttpProxyConfig httpProxyConfig; + + public AsyncRestClientFactory(WebClientConfig clientConfig) { + if (clientConfig != null) { + this.sslContextFactory = new CachingSslContextFactory(clientConfig); + this.httpProxyConfig = clientConfig.httpProxyConfig(); + } else { + logger.warn("No configuration for web client defined, HTTPS will not work"); + this.sslContextFactory = null; + this.httpProxyConfig = null; + } + } + + public AsyncRestClient createRestClientNoHttpProxy(String baseUrl) { + return createRestClient(baseUrl, false); + } + + public AsyncRestClient createRestClientUseHttpProxy(String baseUrl) { + return createRestClient(baseUrl, true); + } + + private AsyncRestClient createRestClient(String baseUrl, boolean useHttpProxy) { + if (this.sslContextFactory != null) { + try { + return new AsyncRestClient(baseUrl, this.sslContextFactory.createSslContext(), + useHttpProxy ? httpProxyConfig : null); + } catch (Exception e) { + String exceptionString = e.toString(); + logger.error("Could not init SSL context, reason: {}", exceptionString); + } + } + return new AsyncRestClient(baseUrl, null, httpProxyConfig); + } + + private class SslContextFactory { + private final WebClientConfig clientConfig; + + public SslContextFactory(WebClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, IOException { + return this.createSslContext(createKeyManager()); + } + + private SslContext createSslContext(KeyManagerFactory keyManager) + throws NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException { + if (this.clientConfig.isTrustStoreUsed()) { + return createSslContextRejectingUntrustedPeers(this.clientConfig.trustStore(), + this.clientConfig.trustStorePassword(), keyManager); + } else { + // Trust anyone + return SslContextBuilder.forClient() // + .keyManager(keyManager) // + .trustManager(InsecureTrustManagerFactory.INSTANCE) // + .build(); + } + } + + private SslContext createSslContextRejectingUntrustedPeers(String trustStorePath, String trustStorePass, + KeyManagerFactory keyManager) + throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + + final KeyStore trustStore = getTrustStore(trustStorePath, trustStorePass); + List certificateList = Collections.list(trustStore.aliases()).stream() // + .filter(alias -> isCertificateEntry(trustStore, alias)) // + .map(alias -> getCertificate(trustStore, alias)) // + .collect(Collectors.toList()); + final X509Certificate[] certificates = certificateList.toArray(new X509Certificate[certificateList.size()]); + + return SslContextBuilder.forClient() // + .keyManager(keyManager) // + .trustManager(certificates) // + .build(); + } + + private boolean isCertificateEntry(KeyStore trustStore, String alias) { + try { + return trustStore.isCertificateEntry(alias); + } catch (KeyStoreException e) { + logger.error("Error reading truststore {}", e.getMessage()); + return false; + } + } + + private Certificate getCertificate(KeyStore trustStore, String alias) { + try { + return trustStore.getCertificate(alias); + } catch (KeyStoreException e) { + logger.error("Error reading truststore {}", e.getMessage()); + return null; + } + } + + private KeyManagerFactory createKeyManager() throws NoSuchAlgorithmException, CertificateException, IOException, + UnrecoverableKeyException, KeyStoreException { + final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final KeyStore keyStore = KeyStore.getInstance(this.clientConfig.keyStoreType()); + final String keyStoreFile = this.clientConfig.keyStore(); + final String keyStorePassword = this.clientConfig.keyStorePassword(); + final String keyPassword = this.clientConfig.keyPassword(); + try (final InputStream inputStream = new FileInputStream(keyStoreFile)) { + keyStore.load(inputStream, keyStorePassword.toCharArray()); + } + keyManager.init(keyStore, keyPassword.toCharArray()); + return keyManager; + } + + private synchronized KeyStore getTrustStore(String trustStorePath, String trustStorePass) + throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + + KeyStore store = KeyStore.getInstance(KeyStore.getDefaultType()); + store.load(new FileInputStream(ResourceUtils.getFile(trustStorePath)), trustStorePass.toCharArray()); + return store; + } + } + + public class CachingSslContextFactory extends SslContextFactory { + private SslContext cachedContext = null; + + public CachingSslContextFactory(WebClientConfig clientConfig) { + super(clientConfig); + } + + @Override + public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, IOException { + if (this.cachedContext == null) { + this.cachedContext = super.createSslContext(); + } + return this.cachedContext; + + } + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java new file mode 100644 index 00000000..e26fd46f --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -0,0 +1,133 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.configuration; + +import java.lang.invoke.MethodHandles; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; + +import lombok.Getter; + +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +@EnableConfigurationProperties +public class ApplicationConfig { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Getter + @Value("${app.configuration-filepath}") + private String localConfigurationFilePath; + + @Value("${server.ssl.key-store-type}") + private String sslKeyStoreType = ""; + + @Value("${server.ssl.key-store-password}") + private String sslKeyStorePassword = ""; + + @Value("${server.ssl.key-store}") + private String sslKeyStore = ""; + + @Value("${server.ssl.key-password}") + private String sslKeyPassword = ""; + + @Value("${app.webclient.trust-store-used}") + private boolean sslTrustStoreUsed = false; + + @Value("${app.webclient.trust-store-password}") + private String sslTrustStorePassword = ""; + + @Value("${app.webclient.trust-store}") + private String sslTrustStore = ""; + + @Value("${app.webclient.http.proxy-host:\"\"}") + private String httpProxyHost = ""; + + @Value("${app.webclient.http.proxy-port:0}") + private int httpProxyPort = 0; + + @Getter + @Value("${server.port}") + private int localServerHttpPort; + + @Getter + @Value("${app.ecs-base-url}") + private String ecsBaseUrl; + + @Getter + @Value("${app.dmaap-adapter-base-url}") + private String selfUrl; + + @Getter + @Value("${app.dmaap-base-url}") + private String dmaapBaseUrl; + + private WebClientConfig webClientConfig = null; + + public WebClientConfig getWebClientConfig() { + if (this.webClientConfig == null) { + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost(this.httpProxyHost) // + .httpProxyPort(this.httpProxyPort) // + .build(); + + this.webClientConfig = ImmutableWebClientConfig.builder() // + .keyStoreType(this.sslKeyStoreType) // + .keyStorePassword(this.sslKeyStorePassword) // + .keyStore(this.sslKeyStore) // + .keyPassword(this.sslKeyPassword) // + .isTrustStoreUsed(this.sslTrustStoreUsed) // + .trustStore(this.sslTrustStore) // + .trustStorePassword(this.sslTrustStorePassword) // + .httpProxyConfig(httpProxyConfig) // + .build(); + } + return this.webClientConfig; + } + + // Adapter to parse the json format of the configuration file. + static class ConfigFile { + Collection types; + } + + public Collection getTypes() { + com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + + try { + String configJson = Files.readString(Path.of(getLocalConfigurationFilePath()), Charset.defaultCharset()); + ConfigFile configData = gson.fromJson(configJson, ConfigFile.class); + return configData.types; + } catch (Exception e) { + logger.error("Could not load configuration file {}", getLocalConfigurationFilePath()); + return Collections.emptyList(); + } + + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/WebClientConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/WebClientConfig.java new file mode 100644 index 00000000..e65fdb99 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/WebClientConfig.java @@ -0,0 +1,54 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.configuration; + +import org.immutables.value.Value; + +@Value.Immutable +@Value.Style(redactedMask = "####") +public interface WebClientConfig { + public String keyStoreType(); + + @Value.Redacted + public String keyStorePassword(); + + public String keyStore(); + + @Value.Redacted + public String keyPassword(); + + public boolean isTrustStoreUsed(); + + @Value.Redacted + public String trustStorePassword(); + + public String trustStore(); + + @Value.Immutable + public interface HttpProxyConfig { + public String httpProxyHost(); + + public int httpProxyPort(); + } + + public HttpProxyConfig httpProxyConfig(); + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java new file mode 100644 index 00000000..39f62fb3 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java @@ -0,0 +1,112 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.controllers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; + +public class ErrorResponse { + private static Gson gson = new GsonBuilder() // + .create(); // + + // Returned as body for all failed REST calls + @Schema(name = "error_information", description = "Problem as defined in https://tools.ietf.org/html/rfc7807") + public static class ErrorInfo { + @SerializedName("type") + private String type = "about:blank"; + + @SerializedName("title") + private String title = null; + + @SerializedName("status") + private final Integer status; + + @SerializedName("detail") + private String detail = null; + + @SerializedName("instance") + private String instance = null; + + public ErrorInfo(String detail, Integer status) { + this.detail = detail; + this.status = status; + } + + @Schema(example = "503", + description = "The HTTP status code generated by the origin server for this occurrence of the problem. ") + public Integer getStatus() { + return status; + } + + @Schema(example = "Policy type not found", + description = " A human-readable explanation specific to this occurrence of the problem.") + public String getDetail() { + return this.detail; + } + + } + + @Schema(name = "message", description = "message") + public final String message; + + ErrorResponse(String message) { + this.message = message; + } + + static Mono> createMono(String text, HttpStatus code) { + return Mono.just(create(text, code)); + } + + static Mono> createMono(Exception e, HttpStatus code) { + return createMono(e.toString(), code); + } + + public static ResponseEntity create(String text, HttpStatus code) { + ErrorInfo p = new ErrorInfo(text, code.value()); + String json = gson.toJson(p); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_PROBLEM_JSON); + return new ResponseEntity<>(json, headers, code); + } + + public static ResponseEntity create(Throwable e, HttpStatus code) { + if (e instanceof RuntimeException) { + code = HttpStatus.INTERNAL_SERVER_ERROR; + } else if (e instanceof ServiceException) { + ServiceException se = (ServiceException) e; + if (se.getHttpStatus() != null) { + code = se.getHttpStatus(); + } + } + return create(e.toString(), code); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java new file mode 100644 index 00000000..4d99c58d --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -0,0 +1,119 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.controllers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; + +import org.oran.dmaapadapter.r1.ProducerJobInfo; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController("ConfigurationControllerV2") +@Tag(name = ProducerCallbacksController.API_NAME) +public class ProducerCallbacksController { + private static final Logger logger = LoggerFactory.getLogger(ProducerCallbacksController.class); + + public static final String API_NAME = "Management of configuration"; + public static final String API_DESCRIPTION = ""; + public static final String JOB_URL = "/dmaap_dataproducer/info_job"; + public static final String SUPERVISION_URL = "/dmaap_dataproducer/health_check"; + private static Gson gson = new GsonBuilder().create(); + private final Jobs jobs; + private final InfoTypes types; + + public ProducerCallbacksController(@Autowired Jobs jobs, @Autowired InfoTypes types) { + this.jobs = jobs; + this.types = types; + } + + @PostMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Callback for Information Job creation/modification", + description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity jobCreatedCallback( // + @RequestBody String body) { + try { + ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class); + + logger.info("Job started callback {}", request.id); + Job job = new Job(request.id, request.targetUri, types.getType(request.typeId)); + this.jobs.put(job); + return new ResponseEntity<>(HttpStatus.OK); + } catch (Exception e) { + return ErrorResponse.create(e, HttpStatus.NOT_FOUND); + } + } + + @DeleteMapping(path = JOB_URL + "/{infoJobId}", produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Callback for Information Job deletion", + description = "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity jobDeletedCallback( // + @PathVariable("infoJobId") String infoJobId) { + try { + logger.info("Job deleted callback {}", infoJobId); + this.jobs.remove(infoJobId); + return new ResponseEntity<>(HttpStatus.OK); + } catch (Exception e) { + return ErrorResponse.create(e, HttpStatus.NOT_FOUND); + } + } + + @GetMapping(path = SUPERVISION_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Producer supervision", + description = "The endpoint is provided by the Information Producer and is used for supervision of the producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "The producer is OK", // + content = @Content(schema = @Schema(implementation = String.class))) // + }) + public ResponseEntity producerSupervision() { + logger.info("Producer supervision"); + return new ResponseEntity<>(HttpStatus.OK); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/VoidResponse.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/VoidResponse.java new file mode 100644 index 00000000..b7bba5f4 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/VoidResponse.java @@ -0,0 +1,31 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.controllers; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "void", description = "Void/empty") +public class VoidResponse { + private VoidResponse() {} +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java new file mode 100644 index 00000000..740911d4 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/exceptions/ServiceException.java @@ -0,0 +1,49 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.exceptions; + +import lombok.Getter; + +import org.springframework.http.HttpStatus; + +public class ServiceException extends Exception { + + private static final long serialVersionUID = 1L; + + @Getter + private final HttpStatus httpStatus; + + public ServiceException(String message) { + super(message); + httpStatus = null; + } + + public ServiceException(String message, Exception originalException) { + super(message, originalException); + httpStatus = null; + } + + public ServiceException(String message, HttpStatus httpStatus) { + super(message); + this.httpStatus = httpStatus; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java new file mode 100644 index 00000000..ce4a3b78 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java @@ -0,0 +1,71 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "consumer_job", description = "Information for an Enrichment Information Job") +public class ConsumerJobInfo { + + @Schema(name = "info_type_id", description = "Information type Idenitifier of the subscription job", + required = true) + @SerializedName("info_type_id") + @JsonProperty(value = "info_type_id", required = true) + public String infoTypeId = ""; + + @Schema(name = "job_owner", description = "Identity of the owner of the job", required = true) + @SerializedName("job_owner") + @JsonProperty(value = "job_owner", required = true) + public String owner = ""; + + @Schema(name = "job_definition", description = "Information type specific job data", required = true) + @SerializedName("job_definition") + @JsonProperty(value = "job_definition", required = true) + public Object jobDefinition; + + @Schema(name = "job_result_uri", description = "The target URI of the subscribed information", required = true) + @SerializedName("job_result_uri") + @JsonProperty(value = "job_result_uri", required = true) + public String jobResultUri = ""; + + @Schema(name = "status_notification_uri", + description = "The target of Information subscription job status notifications", required = false) + @SerializedName("status_notification_uri") + @JsonProperty(value = "status_notification_uri", required = false) + public String statusNotificationUri = ""; + + public ConsumerJobInfo() {} + + public ConsumerJobInfo(String infoTypeId, Object jobData, String owner, String targetUri, + String statusNotificationUri) { + this.infoTypeId = infoTypeId; + this.jobDefinition = jobData; + this.owner = owner; + this.jobResultUri = targetUri; + this.statusNotificationUri = statusNotificationUri; + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerInfoTypeInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerInfoTypeInfo.java new file mode 100644 index 00000000..1bf5e474 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerInfoTypeInfo.java @@ -0,0 +1,52 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "producer_info_type_info", description = "Information for an Information Type") +public class ProducerInfoTypeInfo { + + @Schema(name = "info_job_data_schema", description = "Json schema for the job data", required = true) + @SerializedName("info_job_data_schema") + @JsonProperty(value = "info_job_data_schema", required = true) + public Object jobDataSchema; + + @Schema(name = "info_type_information", description = "Type specific information for the information type", + required = true) + @SerializedName("info_type_information") + @JsonProperty(value = "info_type_information", required = true) + public Object typeSpecificInformation; + + public ProducerInfoTypeInfo(Object jobDataSchema, Object typeSpecificInformation) { + this.jobDataSchema = jobDataSchema; + this.typeSpecificInformation = typeSpecificInformation; + } + + public ProducerInfoTypeInfo() {} + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerJobInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerJobInfo.java new file mode 100644 index 00000000..d378825a --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerJobInfo.java @@ -0,0 +1,77 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import org.immutables.gson.Gson; + +@Gson.TypeAdapters +@Schema(name = "producer_info_job_request", + description = "The body of the Information Producer callbacks for Information Job creation and deletion") +public class ProducerJobInfo { + + @Schema(name = "info_job_identity", description = "Identity of the Information Job", required = true) + @SerializedName("info_job_identity") + @JsonProperty("info_job_identity") + public String id = ""; + + @Schema(name = "info_type_identity", description = "Type identity for the job") + @SerializedName("info_type_identity") + @JsonProperty("info_type_identity") + public String typeId = ""; + + @Schema(name = "info_job_data", description = "Json for the job data") + @SerializedName("info_job_data") + @JsonProperty("info_job_data") + public Object jobData; + + @Schema(name = "target_uri", description = "URI for the target of the produced Information") + @SerializedName("target_uri") + @JsonProperty("target_uri") + public String targetUri = ""; + + @Schema(name = "owner", description = "The owner of the job") + @SerializedName("owner") + @JsonProperty("owner") + public String owner = ""; + + @Schema(name = "last_updated", description = "The time when the job was last updated or created (ISO-8601)") + @SerializedName("last_updated") + @JsonProperty("last_updated") + public String lastUpdated = ""; + + public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner, + String lastUpdated) { + this.id = id; + this.jobData = jobData; + this.typeId = typeId; + this.targetUri = targetUri; + this.owner = owner; + this.lastUpdated = lastUpdated; + } + + public ProducerJobInfo() {} + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerRegistrationInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerRegistrationInfo.java new file mode 100644 index 00000000..e54c1522 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ProducerRegistrationInfo.java @@ -0,0 +1,64 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.r1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.annotations.SerializedName; + +import io.swagger.v3.oas.annotations.media.Schema; + +import java.util.Collection; + +import lombok.Builder; + +import org.immutables.gson.Gson; + +@Builder +@Gson.TypeAdapters +@Schema(name = "producer_registration_info", description = "Information for an Information Producer") +public class ProducerRegistrationInfo { + + @Schema(name = "supported_info_types", description = "Supported Information Type IDs", required = true) + @SerializedName("supported_info_types") + @JsonProperty(value = "supported_info_types", required = true) + public Collection supportedTypeIds; + + @Schema(name = "info_job_callback_url", description = "callback for Information Job", required = true) + @SerializedName("info_job_callback_url") + @JsonProperty(value = "info_job_callback_url", required = true) + public String jobCallbackUrl; + + @Schema(name = "info_producer_supervision_callback_url", description = "callback for producer supervision", + required = true) + @SerializedName("info_producer_supervision_callback_url") + @JsonProperty(value = "info_producer_supervision_callback_url", required = true) + public String producerSupervisionCallbackUrl; + + public ProducerRegistrationInfo(Collection types, String jobCallbackUrl, + String producerSupervisionCallbackUrl) { + this.supportedTypeIds = types; + this.jobCallbackUrl = jobCallbackUrl; + this.producerSupervisionCallbackUrl = producerSupervisionCallbackUrl; + } + + public ProducerRegistrationInfo() {} + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java new file mode 100644 index 00000000..d19577db --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -0,0 +1,38 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import lombok.Getter; + +public class InfoType { + + @Getter + private final String id; + + @Getter + private final String dmaapTopicUrl; + + public InfoType(String id, String dmaapTopicUrl) { + this.id = id; + this.dmaapTopicUrl = dmaapTopicUrl; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java new file mode 100644 index 00000000..b8677a37 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java @@ -0,0 +1,80 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfoTypes { + private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class); + + private Map allTypes = new HashMap<>(); + + public InfoTypes(Collection types) { + + for (InfoType type : types) { + put(type); + } + } + + public synchronized InfoType get(String id) { + return allTypes.get(id); + } + + public synchronized InfoType getType(String id) throws ServiceException { + InfoType type = allTypes.get(id); + if (type == null) { + throw new ServiceException("Could not find type: " + id); + } + return type; + } + + public static class ConfigFile { + Collection types; + } + + private synchronized void put(InfoType type) { + logger.debug("Put type: {}", type.getId()); + allTypes.put(type.getId(), type); + } + + public synchronized Iterable getAll() { + return new Vector<>(allTypes.values()); + } + + public synchronized Collection typeIds() { + return allTypes.keySet(); + } + + public synchronized int size() { + return allTypes.size(); + } + + public synchronized void clear() { + allTypes.clear(); + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java new file mode 100644 index 00000000..690e465b --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -0,0 +1,42 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import lombok.Getter; + +public class Job { + + @Getter + private final String id; + + @Getter + private final String callbackUrl; + + @Getter + private final InfoType type; + + public Job(String id, String callbackUrl, InfoType type) { + this.id = id; + this.callbackUrl = callbackUrl; + this.type = type; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java new file mode 100644 index 00000000..6e2b3265 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -0,0 +1,89 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class Jobs { + private static final Logger logger = LoggerFactory.getLogger(Jobs.class); + + private Map allJobs = new HashMap<>(); + private MultiMap jobsByType = new MultiMap<>(); + + public Jobs() {} + + public synchronized Job getJob(String id) throws ServiceException { + Job job = allJobs.get(id); + if (job == null) { + throw new ServiceException("Could not find job: " + id); + } + return job; + } + + public synchronized Job get(String id) { + return allJobs.get(id); + } + + public synchronized void put(Job job) { + logger.debug("Put service: {}", job.getId()); + allJobs.put(job.getId(), job); + jobsByType.put(job.getType().getId(), job.getId(), job); + } + + public synchronized Iterable getAll() { + return new Vector<>(allJobs.values()); + } + + public synchronized Job remove(String id) { + Job job = allJobs.get(id); + if (job != null) { + remove(job); + } + return job; + } + + public synchronized void remove(Job job) { + this.allJobs.remove(job.getId()); + jobsByType.remove(job.getType().getId(), job.getId()); + } + + public synchronized int size() { + return allJobs.size(); + } + + public synchronized Collection getJobsForType(InfoType type) { + return jobsByType.get(type.getId()); + } + + public synchronized void clear() { + allJobs.clear(); + jobsByType.clear(); + } +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java new file mode 100644 index 00000000..38f3d175 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java @@ -0,0 +1,65 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019-2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.repository; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; + +/** + * A map, where each key can be bound to may values (where each value has an own + * ID) + */ +public class MultiMap { + + private final Map> map = new HashMap<>(); + + public void put(String key, String id, T value) { + this.map.computeIfAbsent(key, k -> new HashMap<>()).put(id, value); + } + + public T remove(String key, String id) { + Map innerMap = this.map.get(key); + if (innerMap != null) { + T removedElement = innerMap.remove(id); + if (innerMap.isEmpty()) { + this.map.remove(key); + } + return removedElement; + } + return null; + } + + public Collection get(String key) { + Map innerMap = this.map.get(key); + if (innerMap == null) { + return Collections.emptyList(); + } + return new Vector<>(innerMap.values()); + } + + public void clear() { + this.map.clear(); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java new file mode 100644 index 00000000..fb5c891c --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -0,0 +1,135 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import java.time.Duration; + +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.Jobs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +/** + * The class fetches incoming requests from DMAAP and sends them further to the + * consumers that has a job for this InformationType. + */ + +public class DmaapMessageConsumer { + private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); + private final ApplicationConfig applicationConfig; + private final AsyncRestClient restClient; + private final InfoType type; + private final Jobs jobs; + private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); + + /** Submits new elements until stopped */ + private static class InfiniteFlux { + private FluxSink sink; + private int counter = 0; + + public synchronized Flux start() { + stop(); + return Flux.create(this::next).doOnRequest(this::onRequest); + } + + public synchronized void stop() { + if (this.sink != null) { + this.sink.complete(); + this.sink = null; + } + } + + void onRequest(long no) { + logger.debug("InfiniteFlux.onRequest {}", no); + for (long i = 0; i < no; ++i) { + sink.next(counter++); + } + } + + void next(FluxSink sink) { + logger.debug("InfiniteFlux.next"); + this.sink = sink; + sink.next(counter++); + } + } + + public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { + this.applicationConfig = applicationConfig; + AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + this.restClient = restclientFactory.createRestClientNoHttpProxy(""); + this.type = type; + this.jobs = jobs; + } + + public void start() { + infiniteSubmitter.stop(); + + createTask().subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {}", value), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("DmaapMessageConsumer stopped") // + ); + } + + protected Flux createTask() { + final int CONCURRENCY = 5; + return infiniteSubmitter.start() // + .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // + .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // + .flatMap(this::handleReceivedMessage, CONCURRENCY); + } + + private String getDmaapUrl() { + return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl(); + } + + private Mono handleErrorResponse(Throwable t) { + logger.debug("error from DMAAP {}", t.getMessage()); + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // + .flatMap(notUsed -> Mono.empty()); + } + + protected Mono getFromMessageRouter(String topicUrl) { + logger.trace("getFromMessageRouter {}", topicUrl); + return restClient.get(topicUrl) // + .onErrorResume(this::handleErrorResponse); + } + + protected Flux handleReceivedMessage(String body) { + logger.debug("Received from DMAAP {}", body); + final int CONCURRENCY = 5; + + // Distibute the body to all jobs for this type + return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // + .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) + .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .onErrorResume(this::handleErrorResponse); + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java new file mode 100644 index 00000000..b9a50b39 --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -0,0 +1,170 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import com.google.gson.JsonParser; + +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; +import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Registers the types and this producer in ECS. This is done when needed. + */ +@Component +@EnableScheduling +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +public class ProducerRegstrationTask { + + private static final Logger logger = LoggerFactory.getLogger(ProducerRegstrationTask.class); + private final AsyncRestClient restClient; + private final ApplicationConfig applicationConfig; + private final InfoTypes types; + private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + + private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; + private boolean isRegisteredInEcs = false; + private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; + + public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + this.restClient = restClientFactory.createRestClientNoHttpProxy(""); + this.applicationConfig = applicationConfig; + this.types = types; + } + + @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) + public void supervisionTask() { + logger.debug("Checking producers starting"); + createTask().subscribe(null, null, () -> logger.debug("Producer registration completed")); + } + + public Mono createTask() { + return checkProducerRegistration() // + .doOnError(t -> isRegisteredInEcs = false) // + .onErrorResume(t -> registerTypesAndProducer()); + } + + public boolean isRegisteredInEcs() { + return this.isRegisteredInEcs; + } + + private Mono checkProducerRegistration() { + final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + return restClient.get(url) // + .flatMap(this::checkRegistrationInfo) // + ; + } + + private String registerTypeUrl(InfoType type) { + String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); + return url; + } + + private Mono registerTypesAndProducer() { + final String producerUrl = + applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + + return Flux.fromIterable(this.types.getAll()) // + .doOnNext(type -> logger.info("Registering type {}", type.getId())) // + .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) // + .collectList() // + .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) // + .onErrorResume(t -> { + logger.warn("Registration failed {}", t.getMessage()); + isRegisteredInEcs = false; + return Mono.empty(); + }) // + .doOnNext(x -> logger.debug("Registering types and producer completed")); + } + + private Object typeSpecifcInfoObject() { + return jsonObject("{}"); + } + + private ProducerInfoTypeInfo typeRegistrationInfo() { + return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject()); + } + + private Object jsonSchemaObject() { + // An object with no properties + String schemaStr = "{" // + + "\"type\": \"object\"," // + + "\"properties\": {}," // + + "\"additionalProperties\": false" // + + "}"; // + return jsonObject(schemaStr); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + logger.error("Bug, error in JSON: {}", json); + throw new NullPointerException(e.toString()); + } + } + + private Mono checkRegistrationInfo(String resp) { + ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class); + if (isEqual(producerRegistrationInfo(), info)) { + logger.debug("Already registered"); + this.isRegisteredInEcs = true; + return Mono.empty(); + } else { + return Mono.error(new ServiceException("Producer registration will be started")); + } + } + + private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) { + return a.jobCallbackUrl.equals(b.jobCallbackUrl) // + && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) // + && a.supportedTypeIds.size() == b.supportedTypeIds.size(); + } + + private ProducerRegistrationInfo producerRegistrationInfo() { + + return ProducerRegistrationInfo.builder() // + .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) // + .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) // + .supportedTypeIds(types.typeIds()) // + .build(); + } + + private String baseUrl() { + return this.applicationConfig.getSelfUrl(); + } +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java new file mode 100644 index 00000000..b2b6d96c --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -0,0 +1,231 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.gson.JsonParser; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; +import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) +@TestPropertySource(properties = { // + "server.ssl.key-store=./config/keystore.jks", // + "app.webclient.trust-store=./config/truststore.jks", // + "app.vardata-directory=./target", // + "app.configuration-filepath=./src/test/resources/test_application_configuration.json"// +}) +class ApplicationTest { + + @Autowired + private ApplicationConfig applicationConfig; + + @Autowired + private ProducerRegstrationTask producerRegstrationTask; + + @Autowired + private Jobs jobs; + + @Autowired + private InfoTypes types; + + @Autowired + private ConsumerController consumerController; + + @Autowired + private EcsSimulatorController ecsSimulatorController; + + @LocalServerPort + int localServerHttpPort; + + static class TestApplicationConfig extends ApplicationConfig { + @Override + public String getEcsBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getDmaapBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getSelfUrl() { + return thisProcessUrl(); + } + + private String thisProcessUrl() { + final String url = "https://localhost:" + getLocalServerHttpPort(); + return url; + } + } + + /** + * Overrides the BeanFactory. + */ + @TestConfiguration + static class TestBeanFactory extends BeanFactory { + + @Override + @Bean + public ServletWebServerFactory servletContainer() { + return new TomcatServletWebServerFactory(); + } + + @Override + @Bean + public ApplicationConfig getApplicationConfig() { + TestApplicationConfig cfg = new TestApplicationConfig(); + return cfg; + } + } + + @AfterEach + void reset() { + this.consumerController.testResults.reset(); + this.ecsSimulatorController.testResults.reset(); + this.jobs.clear(); + this.types.clear(); + } + + private AsyncRestClient restClient(boolean useTrustValidation) { + WebClientConfig config = this.applicationConfig.getWebClientConfig(); + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost("") // + .httpProxyPort(0) // + .build(); + config = ImmutableWebClientConfig.builder() // + .keyStoreType(config.keyStoreType()) // + .keyStorePassword(config.keyStorePassword()) // + .keyStore(config.keyStore()) // + .keyPassword(config.keyPassword()) // + .isTrustStoreUsed(useTrustValidation) // + .trustStore(config.trustStore()) // + .trustStorePassword(config.trustStorePassword()) // + .httpProxyConfig(httpProxyConfig).build(); + + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + return restClientFactory.createRestClientNoHttpProxy(baseUrl()); + } + + private AsyncRestClient restClient() { + return restClient(false); + } + + private String baseUrl() { + return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); + } + + private ConsumerJobInfo consumerJobInfo() { + InfoType type = this.types.getAll().iterator().next(); + return consumerJobInfo(type.getId(), "EI_JOB_ID"); + } + + private Object jsonObject() { + return jsonObject("{}"); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + throw new NullPointerException(e.toString()); + } + } + + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + try { + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + } catch (Exception e) { + return null; + } + } + + @Test + void generateApiDoc() throws IOException { + String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs"; + ResponseEntity resp = restClient().getForEntity(url).block(); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); + JSONObject jsonObj = new JSONObject(resp.getBody()); + assertThat(jsonObj.remove("servers")).isNotNull(); + + String indented = (jsonObj).toString(4); + String docDir = "api/"; + Files.createDirectories(Paths.get(docDir)); + try (PrintStream out = new PrintStream(new FileOutputStream(docDir + "api.json"))) { + out.print(indented); + } + } + + @Test + void testWholeChain() throws Exception { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + + this.ecsSimulatorController.addJob(consumerJobInfo(), restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); + DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + + } + +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java new file mode 100644 index 00000000..1dbe83f7 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -0,0 +1,78 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.oran.dmaapadapter.controllers.VoidResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController("ConsumerSimulatorController") +@Tag(name = "Consts.PRODUCER_API_CALLBACKS_NAME") +public class ConsumerController { + + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String CONSUMER_TARGET_URL = "/consumer"; + + public static class TestResults { + + public List receivedBodies = Collections.synchronizedList(new ArrayList()); + + public TestResults() {} + + public void reset() { + receivedBodies.clear(); + } + } + + final TestResults testResults = new TestResults(); + + @PostMapping(path = CONSUMER_TARGET_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "GET from topic", description = "The call is invoked to push data to consumer") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity postData(@RequestBody String body) { + logger.info("Received by consumer: {}", body); + testResults.receivedBodies.add(body); + return new ResponseEntity<>(HttpStatus.OK); + } + +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java new file mode 100644 index 00000000..fbb600f2 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -0,0 +1,73 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import org.oran.dmaapadapter.controllers.ErrorResponse; +import org.oran.dmaapadapter.controllers.VoidResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController("ProducerSimulatorController") +@Tag(name = "ProducerConsts.PRODUCER_API_CALLBACKS_NAME") +public class DmaapSimulatorController { + + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1"; + + public static List dmaapResponses = Collections.synchronizedList(new LinkedList()); + + @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "GET from topic", + description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") + @ApiResponses(value = { // + @ApiResponse(responseCode = "200", description = "OK", // + content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + }) + public ResponseEntity getFromTopic() { + if (dmaapResponses.isEmpty()) { + return ErrorResponse.create("", HttpStatus.NOT_FOUND); + } else { + String resp = dmaapResponses.remove(0); + logger.info("DMAAP simulator returned: {}", resp); + return new ResponseEntity<>(resp, HttpStatus.OK); + } + + } + +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java new file mode 100644 index 00000000..c0420342 --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java @@ -0,0 +1,109 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; + +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; +import org.oran.dmaapadapter.r1.ProducerJobInfo; +import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController("EcsSimulatorController") +@Tag(name = "EcsSimulator") +public class EcsSimulatorController { + + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final static Gson gson = new GsonBuilder().create(); + + public static class TestResults { + + ProducerRegistrationInfo registrationInfo; + Map types = new HashMap<>(); + + public TestResults() {} + + public void reset() { + registrationInfo = null; + types.clear(); + } + } + + final TestResults testResults = new TestResults(); + public static final String API_ROOT = "/data-producer/v1"; + + @GetMapping(path = API_ROOT + "/info-producers/{infoProducerId}", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity getInfoProducer( // + @PathVariable("infoProducerId") String infoProducerId) { + + if (testResults.registrationInfo != null) { + return new ResponseEntity<>(gson.toJson(testResults.registrationInfo), HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + } + + @PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", // + produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity putInfoProducer( // + @PathVariable("infoProducerId") String infoProducerId, // + @RequestBody ProducerRegistrationInfo registrationInfo) { + testResults.registrationInfo = registrationInfo; + return new ResponseEntity<>(HttpStatus.OK); + } + + @PutMapping(path = API_ROOT + "/info-types/{infoTypeId}", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity putInfoType( // + @PathVariable("infoTypeId") String infoTypeId, // + @RequestBody ProducerInfoTypeInfo registrationInfo) { + testResults.types.put(infoTypeId, registrationInfo); + return new ResponseEntity<>(HttpStatus.OK); + } + + public void addJob(ConsumerJobInfo job, AsyncRestClient restClient) { + String url = this.testResults.registrationInfo.jobCallbackUrl; + ProducerJobInfo request = + new ProducerJobInfo(job.jobDefinition, "ID", job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP"); + String body = gson.toJson(request); + logger.info("ECS Simulator PUT job: {}", body); + restClient.post(url, body).block(); + + } +} diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java new file mode 100644 index 00000000..f9ead03a --- /dev/null +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java @@ -0,0 +1,224 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; +import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig; +import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) +@TestPropertySource(properties = { // + "server.ssl.key-store=./config/keystore.jks", // + "app.webclient.trust-store=./config/truststore.jks", // + "app.vardata-directory=./target", // + "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // + "app.ecs-base-url=https://localhost:8434" // +}) +class IntegrationWithEcs { + + @Autowired + private ApplicationConfig applicationConfig; + + @Autowired + private ProducerRegstrationTask producerRegstrationTask; + + @Autowired + private Jobs jobs; + + @Autowired + private InfoTypes types; + + @Autowired + private ConsumerController consumerController; + + private static Gson gson = new GsonBuilder().create(); + + static class TestApplicationConfig extends ApplicationConfig { + + @Override + public String getEcsBaseUrl() { + return "https://localhost:8434"; + } + + @Override + public String getDmaapBaseUrl() { + return thisProcessUrl(); + } + + @Override + public String getSelfUrl() { + return thisProcessUrl(); + } + + private String thisProcessUrl() { + final String url = "https://localhost:" + getLocalServerHttpPort(); + return url; + } + } + + /** + * Overrides the BeanFactory. + */ + @TestConfiguration + static class TestBeanFactory extends BeanFactory { + + @Override + @Bean + public ServletWebServerFactory servletContainer() { + return new TomcatServletWebServerFactory(); + } + + @Override + @Bean + public ApplicationConfig getApplicationConfig() { + TestApplicationConfig cfg = new TestApplicationConfig(); + return cfg; + } + } + + @AfterEach + void reset() { + this.consumerController.testResults.reset(); + this.jobs.clear(); + this.types.clear(); + } + + private AsyncRestClient restClient(boolean useTrustValidation) { + WebClientConfig config = this.applicationConfig.getWebClientConfig(); + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost("") // + .httpProxyPort(0) // + .build(); + config = ImmutableWebClientConfig.builder() // + .keyStoreType(config.keyStoreType()) // + .keyStorePassword(config.keyStorePassword()) // + .keyStore(config.keyStore()) // + .keyPassword(config.keyPassword()) // + .isTrustStoreUsed(useTrustValidation) // + .trustStore(config.trustStore()) // + .trustStorePassword(config.trustStorePassword()) // + .httpProxyConfig(httpProxyConfig).build(); + + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + return restClientFactory.createRestClientNoHttpProxy(selfBaseUrl()); + } + + private AsyncRestClient restClient() { + return restClient(false); + } + + private String selfBaseUrl() { + return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); + } + + private String ecsBaseUrl() { + return applicationConfig.getEcsBaseUrl(); + } + + private void createInformationJobInEcs() { + String url = ecsBaseUrl() + "/data-consumer/v1/info-jobs/jobId"; + String body = gson.toJson(consumerJobInfo()); + try { + // Delete the job if it already exists + restClient().delete(url).block(); + } catch (Exception e) { + } + restClient().putForEntity(url, body).block(); + } + + private ConsumerJobInfo consumerJobInfo() { + InfoType type = this.types.getAll().iterator().next(); + return consumerJobInfo(type.getId(), "EI_JOB_ID"); + } + + private Object jsonObject() { + return jsonObject("{}"); + } + + private Object jsonObject(String json) { + try { + return JsonParser.parseString(json).getAsJsonObject(); + } catch (Exception e) { + throw new NullPointerException(e.toString()); + } + } + + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + try { + String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; + return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + } catch (Exception e) { + return null; + } + } + + @Test + void testWholeChain() throws Exception { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + + createInformationJobInEcs(); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); + DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + + ConsumerController.TestResults results = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2)); + assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + + synchronized (this) { + // logger.warn("**************** Keeping server alive! " + + // this.applicationConfig.getLocalServerHttpPort()); + // this.wait(); + } + } + +} diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json new file mode 100644 index 00000000..0db20cb3 --- /dev/null +++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json @@ -0,0 +1,8 @@ +{ + "types": [ + { + "id": "ExampleInformationType", + "dmaapTopicUrl": "/dmaap-topic-1" + } + ] +} \ No newline at end of file diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore index 567963e6..0d08f66c 100644 --- a/dmaap-mediator-producer/.gitignore +++ b/dmaap-mediator-producer/.gitignore @@ -3,3 +3,4 @@ coverage.* main dmaapmediatorproducer +__debug_bin* diff --git a/dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh b/dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh index ad8b8bb4..da5f3422 100755 --- a/dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh +++ b/dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh @@ -23,6 +23,8 @@ curdir=`pwd` # go installs tools like go-acc to $HOME/go/bin # ubuntu minion path lacks go export PATH=$PATH:/usr/local/go/bin:$HOME/go/bin +go version +cd dmaap-mediator-producer # install the go coverage tool helper go get -v github.com/ory/go-acc @@ -32,4 +34,5 @@ go get github.com/stretchr/testify/mock@v1.7.0 go-acc ./... --ignore mocks +cp coverage.txt $curdir echo "--> build-dmaapmediatorproducer-ubuntu.sh ends" diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 6969f9fa..3616c584 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -22,19 +22,34 @@ package config import ( "os" + "strconv" + + log "github.com/sirupsen/logrus" ) type Config struct { - LogLevel string - JobResultUri string - InfoCoordinatorAddress string + LogLevel string + InfoProducerSupervisionCallbackHost string + InfoProducerSupervisionCallbackPort int + InfoJobCallbackHost string + InfoJobCallbackPort int + InfoCoordinatorAddress string +} + +type ProducerRegistrationInfo struct { + InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` + SupportedInfoTypes []string `json:"supported_info_types"` + InfoJobCallbackUrl string `json:"info_job_callback_url"` } func New() *Config { return &Config{ - LogLevel: getEnv("LOG_LEVEL", "Info"), - JobResultUri: getEnv("JOB_RESULT_URI", ""), - InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), + LogLevel: getEnv("LOG_LEVEL", "Info"), + InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""), + InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085), + InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""), + InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086), + InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), } } @@ -45,3 +60,14 @@ func getEnv(key string, defaultVal string) string { return defaultVal } + +func getEnvAsInt(name string, defaultVal int) int { + valueStr := getEnv(name, "") + if value, err := strconv.Atoi(valueStr); err == nil { + return value + } else if valueStr != "" { + log.Warnf("Invalid int value: %v for variable: %v. Default value: %v will be used", valueStr, name, defaultVal) + } + + return defaultVal +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index f0106d0f..4a65dc0d 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -21,31 +21,71 @@ package config import ( + "bytes" "os" "reflect" "testing" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" ) func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("LOG_LEVEL", "Debug") - os.Setenv("JOB_RESULT_URI", "testUrl") - os.Setenv("INFO_COORD_ADDR", "testAddr") - defer os.Clearenv() + os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost") + os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095") + os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost") + os.Setenv("INFO_JOB_CALLBACK_PORT", "8096") + os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") + t.Cleanup(func() { + os.Clearenv() + }) + wantConfig := Config{ + LogLevel: "Debug", + InfoProducerSupervisionCallbackHost: "supervisionCallbackHost", + InfoProducerSupervisionCallbackPort: 8095, + InfoJobCallbackHost: "jobCallbackHost", + InfoJobCallbackPort: 8096, + InfoCoordinatorAddress: "infoCoordAddr", + } + if got := New(); !reflect.DeepEqual(got, &wantConfig) { + t.Errorf("New() = %v, want %v", got, &wantConfig) + } +} + +func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) { + assertions := require.New(t) + var buf bytes.Buffer + log.SetOutput(&buf) + + os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong") + t.Cleanup(func() { + log.SetOutput(os.Stderr) + os.Clearenv() + }) wantConfig := Config{ - LogLevel: "Debug", - JobResultUri: "testUrl", - InfoCoordinatorAddress: "testAddr", + LogLevel: "Info", + InfoProducerSupervisionCallbackHost: "", + InfoProducerSupervisionCallbackPort: 8085, + InfoJobCallbackHost: "", + InfoJobCallbackPort: 8086, + InfoCoordinatorAddress: "http://enrichmentservice:8083", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) } + logString := buf.String() + assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used") } func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { wantConfig := Config{ - LogLevel: "Info", - JobResultUri: "", - InfoCoordinatorAddress: "http://enrichmentservice:8083", + LogLevel: "Info", + InfoProducerSupervisionCallbackHost: "", + InfoProducerSupervisionCallbackPort: 8085, + InfoJobCallbackHost: "", + InfoJobCallbackPort: 8086, + InfoCoordinatorAddress: "http://enrichmentservice:8083", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index f846f9f5..37225eda 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -21,19 +21,22 @@ package config import ( + "encoding/json" "fmt" "net/url" log "github.com/sirupsen/logrus" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) const registerTypePath = "/data-producer/v1/info-types/" +const registerProducerPath = "/data-producer/v1/info-producers/" type Registrator interface { - RegisterTypes(types []*jobtypes.Type) error + RegisterTypes(types []*jobs.Type) error + RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) } type RegistratorImpl struct { @@ -46,13 +49,25 @@ func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl { } } -func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error { +func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error { for _, jobType := range jobTypes { body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema) - if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil { + if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil { return error } log.Debugf("Registered type: %v", jobType) } return nil } + +func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error { + if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil { + if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil { + return putErr + } + log.Debugf("Registered producer: %v", producerId) + return nil + } else { + return marshalErr + } +} diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index d3dd3a0c..a89c43f8 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) @@ -43,11 +43,11 @@ func TestRegisterTypes(t *testing.T) { restclient.Client = &clientMock - type1 := jobtypes.Type{ - Name: "Type1", + type1 := jobs.Type{ + TypeId: "Type1", Schema: `{"title": "Type 1"}`, } - types := []*jobtypes.Type{&type1} + types := []*jobs.Type{&type1} r := NewRegistratorImpl("http://localhost:9990") err := r.RegisterTypes(types) @@ -68,3 +68,40 @@ func TestRegisterTypes(t *testing.T) { assertions.Equal(expectedBody, body) clientMock.AssertNumberOfCalls(t, "Do", 1) } + +func TestRegisterProducer(t *testing.T) { + assertions := require.New(t) + + clientMock := mocks.HTTPClient{} + + clientMock.On("Do", mock.Anything).Return(&http.Response{ + StatusCode: http.StatusCreated, + }, nil) + + restclient.Client = &clientMock + + producer := ProducerRegistrationInfo{ + InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl", + SupportedInfoTypes: []string{"type1"}, + InfoJobCallbackUrl: "jobCallbackUrl", + } + + r := NewRegistratorImpl("http://localhost:9990") + err := r.RegisterProducer("Producer1", &producer) + + assertions.Nil(err) + var actualRequest *http.Request + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodPut, actualRequest.Method) + assertions.Equal("http", actualRequest.URL.Scheme) + assertions.Equal("localhost:9990", actualRequest.URL.Host) + assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path) + assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + body, _ := ioutil.ReadAll(actualRequest.Body) + expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`) + assertions.Equal(expectedBody, body) + clientMock.AssertNumberOfCalls(t, "Do", 1) +} diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go new file mode 100644 index 00000000..73471789 --- /dev/null +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -0,0 +1,140 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package jobs + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +type Type struct { + TypeId string + Schema string +} + +type JobInfo struct { + Owner string `json:"owner"` + LastUpdated string `json:"last_updated"` + InfoJobIdentity string `json:"info_job_identity"` + TargetUri string `json:"target_uri"` + InfoJobData string `json:"info_job_data"` + InfoTypeIdentity string `json:"info_type_identity"` +} + +type JobHandler interface { + AddJob(JobInfo) error +} + +var ( + typeDir = "configs" + Handler JobHandler + allJobs = make(map[string]map[string]JobInfo) +) + +func init() { + Handler = newJobHandlerImpl() +} + +type jobHandlerImpl struct{} + +func newJobHandlerImpl() *jobHandlerImpl { + return &jobHandlerImpl{} +} + +func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { + if err := validateJobInfo(ji); err == nil { + jobs := allJobs[ji.InfoTypeIdentity] + jobs[ji.InfoJobIdentity] = ji + return nil + } else { + return err + } +} + +func validateJobInfo(ji JobInfo) error { + if _, ok := allJobs[ji.InfoTypeIdentity]; !ok { + return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) + } + if ji.InfoJobIdentity == "" { + return fmt.Errorf("missing required job identity: %v", ji) + } + // Temporary for when there are only REST callbacks needed + if ji.TargetUri == "" { + return fmt.Errorf("missing required target URI: %v", ji) + } + return nil +} + +func GetTypes() ([]*Type, error) { + types := make([]*Type, 0, 1) + err := filepath.Walk(typeDir, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if strings.Contains(path, ".json") { + if jobType, err := getType(path); err == nil { + types = append(types, jobType) + } + } + return nil + }) + if err != nil { + return nil, err + } + return types, nil +} + +func GetSupportedTypes() []string { + supportedTypes := []string{} + for k := range allJobs { + supportedTypes = append(supportedTypes, k) + } + return supportedTypes +} + +func AddJob(job JobInfo) error { + return Handler.AddJob(job) +} + +func getType(path string) (*Type, error) { + fileName := filepath.Base(path) + typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) + + if typeSchema, err := os.ReadFile(path); err == nil { + typeInfo := Type{ + TypeId: typeName, + Schema: string(typeSchema), + } + if _, ok := allJobs[typeName]; !ok { + allJobs[typeName] = make(map[string]JobInfo) + } + return &typeInfo, nil + } else { + return nil, err + } +} + +func clearAll() { + allJobs = make(map[string]map[string]JobInfo) +} diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go new file mode 100644 index 00000000..09410338 --- /dev/null +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -0,0 +1,120 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package jobs + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +const type1Schema = `{"title": "Type 1"}` + +func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { + assertions := require.New(t) + typesDir, err := os.MkdirTemp("", "configs") + if err != nil { + t.Errorf("Unable to create temporary directory for types due to: %v", err) + } + t.Cleanup(func() { + os.RemoveAll(typesDir) + clearAll() + }) + typeDir = typesDir + fname := filepath.Join(typesDir, "type1.json") + if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil { + t.Errorf("Unable to create temporary files for types due to: %v", err) + } + types, err := GetTypes() + wantedType := Type{ + TypeId: "type1", + Schema: type1Schema, + } + wantedTypes := []*Type{&wantedType} + assertions.EqualValues(wantedTypes, types) + assertions.Nil(err) + + supportedTypes := GetSupportedTypes() + assertions.EqualValues([]string{"type1"}, supportedTypes) +} + +func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { + assertions := require.New(t) + allJobs["type1"] = make(map[string]JobInfo) + t.Cleanup(func() { + clearAll() + }) + jobInfo := JobInfo{ + Owner: "owner", + LastUpdated: "now", + InfoJobIdentity: "job1", + TargetUri: "target", + InfoJobData: "{}", + InfoTypeIdentity: "type1", + } + + err := AddJob(jobInfo) + assertions.Nil(err) + assertions.Equal(1, len(allJobs["type1"])) + assertions.Equal(jobInfo, allJobs["type1"]["job1"]) +} + +func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { + assertions := require.New(t) + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + } + + err := AddJob(jobInfo) + assertions.NotNil(err) + assertions.Equal("type not supported: type1", err.Error()) +} + +func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { + assertions := require.New(t) + allJobs["type1"] = make(map[string]JobInfo) + t.Cleanup(func() { + clearAll() + }) + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + } + + err := AddJob(jobInfo) + assertions.NotNil(err) + assertions.Equal("missing required job identity: { type1}", err.Error()) +} + +func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { + assertions := require.New(t) + allJobs["type1"] = make(map[string]JobInfo) + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + InfoJobIdentity: "job1", + } + + err := AddJob(jobInfo) + assertions.NotNil(err) + assertions.Equal("missing required target URI: { job1 type1}", err.Error()) + clearAll() +} diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go deleted file mode 100644 index 14d837de..00000000 --- a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go +++ /dev/null @@ -1,68 +0,0 @@ -// - -// ========================LICENSE_START================================= -// O-RAN-SC -// %% -// Copyright (C) 2021: Nordix Foundation -// %% -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ========================LICENSE_END=================================== -// - -package jobtypes - -import ( - "os" - "path/filepath" - "strings" -) - -type Type struct { - Name string - Schema string -} - -var typeDir = "configs" - -func GetTypes() ([]*Type, error) { - types := make([]*Type, 0, 1) - err := filepath.Walk(typeDir, - func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if strings.Contains(path, ".json") { - if jobType, err := getType(path); err == nil { - types = append(types, jobType) - } - } - return nil - }) - if err != nil { - return nil, err - } - return types, nil -} - -func getType(path string) (*Type, error) { - fileName := filepath.Base(path) - typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) - - if typeSchema, err := os.ReadFile(path); err == nil { - return &Type{ - Name: typeName, - Schema: string(typeSchema), - }, nil - } else { - return nil, err - } -} diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go deleted file mode 100644 index 5fdc378b..00000000 --- a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// - -// ========================LICENSE_START================================= -// O-RAN-SC -// %% -// Copyright (C) 2021: Nordix Foundation -// %% -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ========================LICENSE_END=================================== -// - -package jobtypes - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" -) - -const type1Schema = `{"title": "Type 1"}` - -func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) { - assertions := require.New(t) - typesDir, err := os.MkdirTemp("", "configs") - if err != nil { - t.Errorf("Unable to create temporary directory for types due to: %v", err) - } - defer os.RemoveAll(typesDir) - typeDir = typesDir - fname := filepath.Join(typesDir, "type1.json") - if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil { - t.Errorf("Unable to create temporary files for types due to: %v", err) - } - types, err := GetTypes() - wantedType := Type{ - Name: "type1", - Schema: type1Schema, - } - wantedTypes := []*Type{&wantedType} - assertions.EqualValues(wantedTypes, types) - assertions.Nil(err) -} diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index c6eb24c7..78a02b6f 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -94,6 +94,7 @@ func isResponseSuccess(statusCode int) bool { } func getRequestError(response *http.Response) RequestError { + defer response.Body.Close() responseData, _ := io.ReadAll(response.Body) putError := RequestError{ StatusCode: response.StatusCode, diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go new file mode 100644 index 00000000..c3a1331c --- /dev/null +++ b/dmaap-mediator-producer/internal/server/server.go @@ -0,0 +1,81 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package server + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" +) + +func StatusHandler(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.Error(w, "404 not found.", http.StatusNotFound) + return + } + + if r.Method != "GET" { + http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) + return + } + + fmt.Fprintf(w, "All is well!") +} + +func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/producer_simulator/info_job" { + http.Error(w, "404 not found.", http.StatusNotFound) + return + } + + if r.Method != "POST" { + http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) + return + } + + b, readErr := ioutil.ReadAll(r.Body) + if readErr != nil { + http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest) + return + } + jobInfo := jobs.JobInfo{} + if unmarshalErr := json.Unmarshal(b, &jobInfo); unmarshalErr != nil { + http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest) + return + } + if err := jobs.AddJob(jobInfo); err != nil { + http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) + } +} + +func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server { + + mux := http.NewServeMux() + mux.HandleFunc("/", handlerFunc) + server := http.Server{ + Addr: fmt.Sprintf(":%v", port), // :{port} + Handler: mux, + } + return &server +} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go new file mode 100644 index 00000000..d221c933 --- /dev/null +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -0,0 +1,178 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package server + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks" +) + +func TestStatusHandler(t *testing.T) { + assertions := require.New(t) + type args struct { + responseRecorder *httptest.ResponseRecorder + r *http.Request + } + tests := []struct { + name string + args args + wantedStatus int + wantedBody string + }{ + { + name: "StatusHandler with correct path and method, should return OK", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("GET", "/", nil, t), + }, + wantedStatus: http.StatusOK, + wantedBody: "All is well!", + }, + { + name: "StatusHandler with incorrect path, should return NotFound", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("GET", "/wrong", nil, t), + }, + wantedStatus: http.StatusNotFound, + wantedBody: "404 not found.\n", + }, + { + name: "StatusHandler with incorrect method, should return MethodNotAllowed", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("PUT", "/", nil, t), + }, + wantedStatus: http.StatusMethodNotAllowed, + wantedBody: "Method is not supported.\n", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(StatusHandler) + handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) + assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code) + + assertions.Equal(tt.wantedBody, tt.args.responseRecorder.Body.String()) + }) + } +} + +func TestCreateInfoJobHandler(t *testing.T) { + assertions := require.New(t) + jobHandlerMock := mocks.JobHandler{} + + goodJobInfo := jobs.JobInfo{ + Owner: "owner", + LastUpdated: "now", + InfoJobIdentity: "jobId", + TargetUri: "target", + InfoJobData: "{}", + InfoTypeIdentity: "type", + } + badJobInfo := jobs.JobInfo{ + Owner: "bad", + } + jobHandlerMock.On("AddJob", goodJobInfo).Return(nil) + jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error")) + jobs.Handler = &jobHandlerMock + + type args struct { + responseRecorder *httptest.ResponseRecorder + r *http.Request + } + tests := []struct { + name string + args args + wantedStatus int + wantedBody string + }{ + { + name: "CreateInfoJobHandler with correct path and method, should return OK", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t), + }, + wantedStatus: http.StatusOK, + wantedBody: "", + }, + { + name: "CreateInfoJobHandler with incorrect job info, should return BadRequest", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t), + }, + wantedStatus: http.StatusBadRequest, + wantedBody: "Invalid job info. Cause: error", + }, + { + name: "CreateInfoJobHandler with incorrect path, should return NotFound", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("GET", "/wrong", nil, t), + }, + wantedStatus: http.StatusNotFound, + wantedBody: "404 not found.", + }, + { + name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("PUT", "/producer_simulator/info_job", nil, t), + }, + wantedStatus: http.StatusMethodNotAllowed, + wantedBody: "Method is not supported.", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(CreateInfoJobHandler) + handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) + assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code) + + assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody) + }) + } +} + +func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request { + var body io.Reader + if jobInfo != nil { + bodyAsBytes, _ := json.Marshal(jobInfo) + body = ioutil.NopCloser(bytes.NewReader(bodyAsBytes)) + } + if req, err := http.NewRequest(method, url, body); err == nil { + return req + } else { + t.Fatalf("Could not create request due to: %v", err) + return nil + } +} diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 240bdbd5..3fe92dca 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -21,14 +21,18 @@ package main import ( - "time" + "fmt" + "sync" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) var configuration *config.Config +var supervisionCallbackAddress string +var jobInfoCallbackAddress string func init() { configuration = config.New() @@ -39,21 +43,55 @@ func init() { } log.Debug("Initializing DMaaP Mediator Producer") - if configuration.JobResultUri == "" { - log.Fatal("Missing JOB_RESULT_URI") + if configuration.InfoProducerSupervisionCallbackHost == "" { + log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST") } + supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort) + + if configuration.InfoJobCallbackHost == "" { + log.Fatal("Missing INFO_JOB_CALLBACK_HOST") + } + jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort) registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress) - if types, err := jobtypes.GetTypes(); err == nil { + if types, err := jobs.GetTypes(); err == nil { if regErr := registrator.RegisterTypes(types); regErr != nil { log.Fatalf("Unable to register all types due to: %v", regErr) } } else { log.Fatalf("Unable to get types to register due to: %v", err) } + producer := config.ProducerRegistrationInfo{ + InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress, + SupportedInfoTypes: jobs.GetSupportedTypes(), + InfoJobCallbackUrl: jobInfoCallbackAddress, + } + if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { + log.Fatalf("Unable to register producer due to: %v", err) + } } func main() { log.Debug("Starting DMaaP Mediator Producer") - time.Sleep(1000 * time.Millisecond) + wg := new(sync.WaitGroup) + + // add two goroutines to `wg` WaitGroup, one for each avilable server + wg.Add(2) + + log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort) + go func() { + server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler) + log.Warn(server.ListenAndServe()) + wg.Done() + }() + + go func() { + server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler) + log.Warn(server.ListenAndServe()) + wg.Done() + }() + + // wait until WaitGroup is done + wg.Wait() + log.Debug("Stopping DMaaP Mediator Producer") } diff --git a/dmaap-mediator-producer/mocks/JobHandler.go b/dmaap-mediator-producer/mocks/JobHandler.go new file mode 100644 index 00000000..4914e4d7 --- /dev/null +++ b/dmaap-mediator-producer/mocks/JobHandler.go @@ -0,0 +1,27 @@ +// Code generated by mockery v2.9.3. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" +) + +// JobHandler is an autogenerated mock type for the JobHandler type +type JobHandler struct { + mock.Mock +} + +// AddJob provides a mock function with given fields: _a0 +func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(jobs.JobInfo) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/enrichment-coordinator-service/api/ecs-api.json b/enrichment-coordinator-service/api/ecs-api.json index 982633ba..0e8164ef 100644 --- a/enrichment-coordinator-service/api/ecs-api.json +++ b/enrichment-coordinator-service/api/ecs-api.json @@ -207,10 +207,7 @@ "producer_info_type_info": { "description": "Information for an Information Type", "type": "object", - "required": [ - "info_job_data_schema", - "info_type_information" - ], + "required": ["info_job_data_schema"], "properties": { "info_type_information": { "description": "Type specific information for the information type", diff --git a/enrichment-coordinator-service/api/ecs-api.yaml b/enrichment-coordinator-service/api/ecs-api.yaml index 1900215a..3445edee 100644 --- a/enrichment-coordinator-service/api/ecs-api.yaml +++ b/enrichment-coordinator-service/api/ecs-api.yaml @@ -1171,7 +1171,6 @@ components: producer_info_type_info: required: - info_job_data_schema - - info_type_information type: object properties: info_type_information: diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerInfoTypeInfo.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerInfoTypeInfo.java index b0375c24..311b8a7d 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerInfoTypeInfo.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerInfoTypeInfo.java @@ -39,9 +39,9 @@ public class ProducerInfoTypeInfo { @Schema( name = "info_type_information", description = "Type specific information for the information type", - required = true) + required = false) @SerializedName("info_type_information") - @JsonProperty(value = "info_type_information", required = true) + @JsonProperty(value = "info_type_information", required = false) public Object typeSpecificInformation; public ProducerInfoTypeInfo(Object jobDataSchema, Object typeSpecificInformation) { diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index 60fe35f3..2d6da4f5 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -99,8 +99,7 @@ public class InfoTypeSubscriptions { } public synchronized void put(SubscriptionInfo subscription) { - allSubscriptions.put(subscription.getId(), subscription); - subscriptionsByOwner.put(subscription.owner, subscription.id, subscription); + doPut(subscription); storeInFile(subscription); logger.debug("Added type status subscription {}", subscription.id); } @@ -259,10 +258,15 @@ public class InfoTypeSubscriptions { for (File file : dbDir.listFiles()) { String json = Files.readString(file.toPath()); SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class); - this.allSubscriptions.put(subscription.getId(), subscription); + doPut(subscription); } } + private void doPut(SubscriptionInfo subscription) { + allSubscriptions.put(subscription.getId(), subscription); + subscriptionsByOwner.put(subscription.owner, subscription.id, subscription); + } + private File getFile(SubscriptionInfo subscription) { return getPath(subscription).toFile(); } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java index e3089f92..85664f6b 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypes.java @@ -126,7 +126,7 @@ public class InfoTypes { out.print(gson.toJson(type)); } } catch (Exception e) { - logger.warn("Could not save job: {} {}", type.getId(), e.getMessage()); + logger.warn("Could not save type: {} {}", type.getId(), e.getMessage()); } } diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java index fa5b7e31..5292d90e 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java @@ -933,6 +933,7 @@ class ApplicationTest { InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig); assertThat(restoredSubscriptions.size()).isEqualTo(1); + assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1); // Delete the subscription restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block(); diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java index 76063114..47df78a2 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/MockEnrichmentService.java @@ -35,7 +35,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; @TestPropertySource( properties = { // "server.ssl.key-store=./config/keystore.jks", // - "app.webclient.trust-store=./config/truststore.jks"}) + "app.webclient.trust-store=./config/truststore.jks", "app.vardata-directory=./target"}) @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock. class MockEnrichmentService { private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); diff --git a/onap/oran b/onap/oran index 2d34af55..65cf80bb 160000 --- a/onap/oran +++ b/onap/oran @@ -1 +1 @@ -Subproject commit 2d34af551ed404a43afdfebb63b2a1b426722071 +Subproject commit 65cf80bbd2eba0ab60d174c04f981ff6cf929e36 diff --git a/pom.xml b/pom.xml index ee840451..e0665a11 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ enrichment-coordinator-service r-app-catalogue helm-manager + dmaap-adaptor-java