From 960e66a1728c1c332f6b74320bbd086a442ba5ea Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Tue, 26 Oct 2021 09:46:11 +0200 Subject: [PATCH 1/1] NONRTRIC - Implement DMaaP mediator producer service in Java Added API documentation. Added some unittest. Improved some logging. Simplified the registration task somewhat. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: I2afa2fbf1073459cf560212f2d2e601446352a25 --- dmaap-adaptor-java/api/api.json | 399 +++++++++++++++++ dmaap-adaptor-java/api/api.yaml | 471 +++++++++++++++++++++ dmaap-adaptor-java/config/application.yaml | 3 +- dmaap-adaptor-java/pom.xml | 4 +- .../java/org/oran/dmaapadapter/SwaggerConfig.java | 43 ++ .../controllers/ProducerCallbacksController.java | 30 +- .../java/org/oran/dmaapadapter/repository/Job.java | 10 +- .../tasks/ProducerRegstrationTask.java | 70 +-- .../org/oran/dmaapadapter/ApplicationTest.java | 27 +- .../org/oran/dmaapadapter/ConsumerController.java | 4 +- .../dmaapadapter/DmaapSimulatorController.java | 4 +- .../oran/dmaapadapter/EcsSimulatorController.java | 9 +- 12 files changed, 1023 insertions(+), 51 deletions(-) create mode 100644 dmaap-adaptor-java/api/api.json create mode 100644 dmaap-adaptor-java/api/api.yaml create mode 100644 dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java diff --git a/dmaap-adaptor-java/api/api.json b/dmaap-adaptor-java/api/api.json new file mode 100644 index 00000000..39056e91 --- /dev/null +++ b/dmaap-adaptor-java/api/api.json @@ -0,0 +1,399 @@ +{ + "components": {"schemas": { + "producer_info_job_request": { + "description": "The body of the Information Producer callbacks for Information Job creation and deletion", + "type": "object", + "required": ["info_job_identity"], + "properties": { + "owner": { + "description": "The owner of the job", + "type": "string" + }, + "last_updated": { + "description": "The time when the job was last updated or created (ISO-8601)", + "type": "string" + }, + "info_job_identity": { + "description": "Identity of the Information Job", + "type": "string" + }, + "target_uri": { + "description": "URI for the target of the produced Information", + "type": "string" + }, + "info_job_data": { + "description": "Json for the job data", + "type": "object" + }, + "info_type_identity": { + "description": "Type identity for the job", + "type": "string" + } + } + }, + "error_information": { + "description": "Problem as defined in https://tools.ietf.org/html/rfc7807", + "type": "object", + "properties": { + "detail": { + "description": " A human-readable explanation specific to this occurrence of the problem.", + "type": "string", + "example": "Policy type not found" + }, + "status": { + "format": "int32", + "description": "The HTTP status code generated by the origin server for this occurrence of the problem. ", + "type": "integer", + "example": 503 + } + } + }, + "void": { + "description": "Void/empty", + "type": "object" + }, + "producer_registration_info": { + "description": "Information for an Information Producer", + "type": "object", + "required": [ + "info_job_callback_url", + "info_producer_supervision_callback_url", + "supported_info_types" + ], + "properties": { + "info_producer_supervision_callback_url": { + "description": "callback for producer supervision", + "type": "string" + }, + "supported_info_types": { + "description": "Supported Information Type IDs", + "type": "array", + "items": { + "description": "Supported Information Type IDs", + "type": "string" + } + }, + "info_job_callback_url": { + "description": "callback for Information Job", + "type": "string" + } + } + }, + "Link": { + "type": "object", + "properties": { + "templated": {"type": "boolean"}, + "href": {"type": "string"} + } + }, + "producer_info_type_info": { + "description": "Information for an Information Type", + "type": "object", + "required": [ + "info_job_data_schema", + "info_type_information" + ], + "properties": { + "info_type_information": { + "description": "Type specific information for the information type", + "type": "object" + }, + "info_job_data_schema": { + "description": "Json schema for the job data", + "type": "object" + } + } + } + }}, + "openapi": "3.0.1", + "paths": { + "/dmaap_dataproducer/info_job": { + "post": { + "summary": "Callback for Information Job creation/modification", + "requestBody": { + "content": {"application/json": {"schema": {"type": "string"}}}, + "required": true + }, + "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.", + "operationId": "jobCreatedCallback", + "responses": { + "200": { + "description": "OK", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}} + }, + "404": { + "description": "Information type is not found", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}} + } + }, + "tags": ["Producer job control API"] + }, + "get": { + "summary": "Get all jobs", + "description": "Returns all info jobs, can be used for trouble shooting", + "operationId": "getJobs", + "responses": {"200": { + "description": "Information jobs", + "content": {"application/json": {"schema": { + "type": "array", + "items": {"$ref": "#/components/schemas/producer_info_job_request"} + }}} + }}, + "tags": ["Producer job control API"] + } + }, + "/dmaap_dataproducer/health_check": {"get": { + "summary": "Producer supervision", + "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.", + "operationId": "producerSupervision", + "responses": {"200": { + "description": "The producer is OK", + "content": {"application/json": {"schema": {"type": "string"}}} + }}, + "tags": ["Producer job control API"] + }}, + "/actuator/threaddump": {"get": { + "summary": "Actuator web endpoint 'threaddump'", + "operationId": "handle_2_1_3", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/actuator/info": {"get": { + "summary": "Actuator web endpoint 'info'", + "operationId": "handle_9", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/data-producer/v1/info-types/{infoTypeId}": {"put": { + "requestBody": { + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_info_type_info"}}}, + "required": true + }, + "operationId": "putInfoType", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "infoTypeId", + "required": true + }], + "tags": ["Information Coordinator Service Simulator (exists only in test)"] + }}, + "/actuator/loggers": {"get": { + "summary": "Actuator web endpoint 'loggers'", + "operationId": "handle_6", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/actuator/health/**": {"get": { + "summary": "Actuator web endpoint 'health-path'", + "operationId": "handle_12", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/data-producer/v1/info-producers/{infoProducerId}": { + "get": { + "operationId": "getInfoProducer", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "infoProducerId", + "required": true + }], + "tags": ["Information Coordinator Service Simulator (exists only in test)"] + }, + "put": { + "requestBody": { + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_registration_info"}}}, + "required": true + }, + "operationId": "putInfoProducer", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "infoProducerId", + "required": true + }], + "tags": ["Information Coordinator Service Simulator (exists only in test)"] + } + }, + "/actuator/metrics/{requiredMetricName}": {"get": { + "summary": "Actuator web endpoint 'metrics-requiredMetricName'", + "operationId": "handle_5", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "requiredMetricName", + "required": true + }], + "tags": ["Actuator"] + }}, + "/actuator": {"get": { + "summary": "Actuator root web endpoint", + "operationId": "links_1", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": { + "additionalProperties": { + "additionalProperties": {"$ref": "#/components/schemas/Link"}, + "type": "object" + }, + "type": "object" + }}} + }}, + "tags": ["Actuator"] + }}, + "/actuator/logfile": {"get": { + "summary": "Actuator web endpoint 'logfile'", + "operationId": "handle_8", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/actuator/loggers/{name}": { + "post": { + "summary": "Actuator web endpoint 'loggers-name'", + "operationId": "handle_0", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "name", + "required": true + }], + "tags": ["Actuator"] + }, + "get": { + "summary": "Actuator web endpoint 'loggers-name'", + "operationId": "handle_7", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "name", + "required": true + }], + "tags": ["Actuator"] + } + }, + "/dmaap_dataproducer/info_job/{infoJobId}": {"delete": { + "summary": "Callback for Information Job deletion", + "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.", + "operationId": "jobDeletedCallback", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "infoJobId", + "required": true + }], + "tags": ["Producer job control API"] + }}, + "/actuator/health": {"get": { + "summary": "Actuator web endpoint 'health'", + "operationId": "handle_11", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/consumer": {"post": { + "summary": "Consume data", + "requestBody": { + "content": {"application/json": {"schema": {"type": "string"}}}, + "required": true + }, + "description": "The call is invoked to push data to consumer", + "operationId": "postData", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}} + }}, + "tags": ["Test Consumer Simulator (exists only in test)"] + }}, + "/dmaap-topic-1": {"get": { + "summary": "GET from topic", + "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.", + "operationId": "getFromTopic", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}} + }}, + "tags": ["DMAAP Simulator (exists only in test)"] + }}, + "/actuator/metrics": {"get": { + "summary": "Actuator web endpoint 'metrics'", + "operationId": "handle_4", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }}, + "/actuator/heapdump": {"get": { + "summary": "Actuator web endpoint 'heapdump'", + "operationId": "handle_10", + "responses": {"200": { + "description": "OK", + "content": {"*/*": {"schema": {"type": "object"}}} + }}, + "tags": ["Actuator"] + }} + }, + "info": { + "license": { + "name": "Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.", + "url": "http://www.apache.org/licenses/LICENSE-2.0" + }, + "description": "Reads data from DMAAP and sends it further to information consumers", + "title": "Generic Dmaap Information Producer", + "version": "1.0" + }, + "tags": [{ + "name": "Actuator", + "description": "Monitor and interact", + "externalDocs": { + "description": "Spring Boot Actuator Web API Documentation", + "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/" + } + }] +} \ No newline at end of file diff --git a/dmaap-adaptor-java/api/api.yaml b/dmaap-adaptor-java/api/api.yaml new file mode 100644 index 00000000..3c9fb599 --- /dev/null +++ b/dmaap-adaptor-java/api/api.yaml @@ -0,0 +1,471 @@ +openapi: 3.0.1 +info: + title: Generic Dmaap Information Producer + description: Reads data from DMAAP and sends it further to information consumers + license: + name: Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License. + url: http://www.apache.org/licenses/LICENSE-2.0 + version: "1.0" +servers: +- url: / +tags: +- name: Actuator + description: Monitor and interact + externalDocs: + description: Spring Boot Actuator Web API Documentation + url: https://docs.spring.io/spring-boot/docs/current/actuator-api/html/ +paths: + /dmaap_dataproducer/info_job: + get: + tags: + - Producer job control API + summary: Get all jobs + description: Returns all info jobs, can be used for trouble shooting + operationId: getJobs + responses: + 200: + description: Information jobs + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/producer_info_job_request' + post: + tags: + - Producer job control API + summary: Callback for Information Job creation/modification + description: The call is invoked to activate or to modify a data subscription. + The endpoint is provided by the Information Producer. + operationId: jobCreatedCallback + requestBody: + content: + application/json: + schema: + type: string + required: true + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/void' + 404: + description: Information type is not found + content: + application/json: + schema: + $ref: '#/components/schemas/error_information' + /dmaap_dataproducer/health_check: + get: + tags: + - Producer job control API + summary: Producer supervision + description: The endpoint is provided by the Information Producer and is used + for supervision of the producer. + operationId: producerSupervision + responses: + 200: + description: The producer is OK + content: + application/json: + schema: + type: string + /actuator/threaddump: + get: + tags: + - Actuator + summary: Actuator web endpoint 'threaddump' + operationId: handle_2_1_3 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /actuator/info: + get: + tags: + - Actuator + summary: Actuator web endpoint 'info' + operationId: handle_9 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /data-producer/v1/info-types/{infoTypeId}: + put: + tags: + - Information Coordinator Service Simulator (exists only in test) + operationId: putInfoType + parameters: + - name: infoTypeId + in: path + required: true + style: simple + explode: false + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/producer_info_type_info' + required: true + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + /actuator/loggers: + get: + tags: + - Actuator + summary: Actuator web endpoint 'loggers' + operationId: handle_6 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /actuator/health/**: + get: + tags: + - Actuator + summary: Actuator web endpoint 'health-path' + operationId: handle_12 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /data-producer/v1/info-producers/{infoProducerId}: + get: + tags: + - Information Coordinator Service Simulator (exists only in test) + operationId: getInfoProducer + parameters: + - name: infoProducerId + in: path + required: true + style: simple + explode: false + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + put: + tags: + - Information Coordinator Service Simulator (exists only in test) + operationId: putInfoProducer + parameters: + - name: infoProducerId + in: path + required: true + style: simple + explode: false + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/producer_registration_info' + required: true + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + /actuator/metrics/{requiredMetricName}: + get: + tags: + - Actuator + summary: Actuator web endpoint 'metrics-requiredMetricName' + operationId: handle_5 + parameters: + - name: requiredMetricName + in: path + required: true + style: simple + explode: false + schema: + type: string + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /actuator: + get: + tags: + - Actuator + summary: Actuator root web endpoint + operationId: links_1 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + additionalProperties: + type: object + additionalProperties: + $ref: '#/components/schemas/Link' + /actuator/logfile: + get: + tags: + - Actuator + summary: Actuator web endpoint 'logfile' + operationId: handle_8 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /actuator/loggers/{name}: + get: + tags: + - Actuator + summary: Actuator web endpoint 'loggers-name' + operationId: handle_7 + parameters: + - name: name + in: path + required: true + style: simple + explode: false + schema: + type: string + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + post: + tags: + - Actuator + summary: Actuator web endpoint 'loggers-name' + operationId: handle_0 + parameters: + - name: name + in: path + required: true + style: simple + explode: false + schema: + type: string + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /dmaap_dataproducer/info_job/{infoJobId}: + delete: + tags: + - Producer job control API + summary: Callback for Information Job deletion + description: The call is invoked to terminate a data subscription. The endpoint + is provided by the Information Producer. + operationId: jobDeletedCallback + parameters: + - name: infoJobId + in: path + required: true + style: simple + explode: false + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/void' + /actuator/health: + get: + tags: + - Actuator + summary: Actuator web endpoint 'health' + operationId: handle_11 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /consumer: + post: + tags: + - Test Consumer Simulator (exists only in test) + summary: Consume data + description: The call is invoked to push data to consumer + operationId: postData + requestBody: + content: + application/json: + schema: + type: string + required: true + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/void' + /dmaap-topic-1: + get: + tags: + - DMAAP Simulator (exists only in test) + summary: GET from topic + description: The call is invoked to activate or to modify a data subscription. + The endpoint is provided by the Information Producer. + operationId: getFromTopic + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/void' + /actuator/metrics: + get: + tags: + - Actuator + summary: Actuator web endpoint 'metrics' + operationId: handle_4 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object + /actuator/heapdump: + get: + tags: + - Actuator + summary: Actuator web endpoint 'heapdump' + operationId: handle_10 + responses: + 200: + description: OK + content: + '*/*': + schema: + type: object +components: + schemas: + producer_info_job_request: + required: + - info_job_identity + type: object + properties: + owner: + type: string + description: The owner of the job + last_updated: + type: string + description: The time when the job was last updated or created (ISO-8601) + info_job_identity: + type: string + description: Identity of the Information Job + target_uri: + type: string + description: URI for the target of the produced Information + info_job_data: + type: object + description: Json for the job data + info_type_identity: + type: string + description: Type identity for the job + description: The body of the Information Producer callbacks for Information + Job creation and deletion + error_information: + type: object + properties: + detail: + type: string + description: ' A human-readable explanation specific to this occurrence + of the problem.' + example: Policy type not found + status: + type: integer + description: 'The HTTP status code generated by the origin server for this + occurrence of the problem. ' + format: int32 + example: 503 + description: Problem as defined in https://tools.ietf.org/html/rfc7807 + void: + type: object + description: Void/empty + producer_registration_info: + required: + - info_job_callback_url + - info_producer_supervision_callback_url + - supported_info_types + type: object + properties: + info_producer_supervision_callback_url: + type: string + description: callback for producer supervision + supported_info_types: + type: array + description: Supported Information Type IDs + items: + type: string + description: Supported Information Type IDs + info_job_callback_url: + type: string + description: callback for Information Job + description: Information for an Information Producer + Link: + type: object + properties: + templated: + type: boolean + href: + type: string + producer_info_type_info: + required: + - info_job_data_schema + - info_type_information + type: object + properties: + info_type_information: + type: object + description: Type specific information for the information type + info_job_data_schema: + type: object + description: Json schema for the job data + description: Information for an Information Type diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml index b327a0b6..5733ea77 100644 --- a/dmaap-adaptor-java/config/application.yaml +++ b/dmaap-adaptor-java/config/application.yaml @@ -11,7 +11,8 @@ management: exposure: # Enabling of springboot actuator features. See springboot documentation. include: "loggers,logfile,health,info,metrics,threaddump,heapdump" - +springdoc: + show-actuator: true logging: # Configuration of logging level: diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml index 01f51e27..1fbd83c3 100644 --- a/dmaap-adaptor-java/pom.xml +++ b/dmaap-adaptor-java/pom.xml @@ -3,7 +3,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% -* Copyright (C) 2019 Nordix Foundation +* Copyright (C) 2021 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -301,7 +301,7 @@ openapi-yaml ${project.basedir}/api - ecs-api.yaml + api.yaml diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java new file mode 100644 index 00000000..8f33377e --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java @@ -0,0 +1,43 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter; + +import io.swagger.v3.oas.annotations.OpenAPIDefinition; +import io.swagger.v3.oas.annotations.info.Info; +import io.swagger.v3.oas.annotations.info.License; + +/** + * Swagger configuration class that uses swagger documentation type and scans + * all the controllers. To access the swagger gui go to + * http://ip:port/swagger-ui.html + */ +@OpenAPIDefinition( // + info = @Info(title = SwaggerConfig.API_TITLE, // + version = "1.0", // + description = SwaggerConfig.DESCRIPTION, // + license = @License(name = "Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.", + url = "http://www.apache.org/licenses/LICENSE-2.0"))) +public class SwaggerConfig { + private SwaggerConfig() {} + + static final String API_TITLE = "Generic Dmaap Information Producer"; + static final String DESCRIPTION = "Reads data from DMAAP and sends it further to information consumers"; +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index d4fe95ce..ca7c96cd 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -24,12 +24,16 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collection; + import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; @@ -52,7 +56,7 @@ import org.springframework.web.bind.annotation.RestController; public class ProducerCallbacksController { private static final Logger logger = LoggerFactory.getLogger(ProducerCallbacksController.class); - public static final String API_NAME = "Management of configuration"; + public static final String API_NAME = "Producer job control API"; public static final String API_DESCRIPTION = ""; public static final String JOB_URL = "/dmaap_dataproducer/info_job"; public static final String SUPERVISION_URL = "/dmaap_dataproducer/health_check"; @@ -70,7 +74,9 @@ public class ProducerCallbacksController { description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.") @ApiResponses(value = { // @ApiResponse(responseCode = "200", description = "OK", // - content = @Content(schema = @Schema(implementation = VoidResponse.class))) // + content = @Content(schema = @Schema(implementation = VoidResponse.class))), // + @ApiResponse(responseCode = "404", description = "Information type is not found", // + content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))), // }) public ResponseEntity jobCreatedCallback( // @RequestBody String body) { @@ -78,7 +84,8 @@ public class ProducerCallbacksController { ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class); logger.info("Job started callback {}", request.id); - Job job = new Job(request.id, request.targetUri, types.getType(request.typeId)); + Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner, + request.lastUpdated); this.jobs.put(job); return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { @@ -86,6 +93,21 @@ public class ProducerCallbacksController { } } + @GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE) + @Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting") + @ApiResponse(responseCode = "200", // + description = "Information jobs", // + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ProducerJobInfo.class)))) // + public ResponseEntity getJobs() { + + Collection producerJobs = new ArrayList<>(); + for (Job j : this.jobs.getAll()) { + producerJobs.add(new ProducerJobInfo(null, j.getId(), j.getType().getId(), j.getCallbackUrl(), j.getOwner(), + j.getLastUpdated())); + } + return new ResponseEntity<>(gson.toJson(producerJobs), HttpStatus.OK); + } + @DeleteMapping(path = JOB_URL + "/{infoJobId}", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Callback for Information Job deletion", description = "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.") @@ -109,7 +131,7 @@ public class ProducerCallbacksController { content = @Content(schema = @Schema(implementation = String.class))) // }) public ResponseEntity producerSupervision() { - logger.info("Producer supervision"); + logger.debug("Producer supervision"); return new ResponseEntity<>(HttpStatus.OK); } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java index 690e465b..0da94a62 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -33,10 +33,18 @@ public class Job { @Getter private final InfoType type; - public Job(String id, String callbackUrl, InfoType type) { + @Getter + private final String owner; + + @Getter + private final String lastUpdated; + + public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) { this.id = id; this.callbackUrl = callbackUrl; this.type = type; + this.owner = owner; + this.lastUpdated = lastUpdated; } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index b9a50b39..837ca323 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -22,11 +22,12 @@ package org.oran.dmaapadapter.tasks; import com.google.gson.JsonParser; +import lombok.Getter; + import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; -import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; import org.oran.dmaapadapter.repository.InfoType; @@ -56,6 +57,7 @@ public class ProducerRegstrationTask { private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; + @Getter private boolean isRegisteredInEcs = false; private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; @@ -68,47 +70,58 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void supervisionTask() { - logger.debug("Checking producers starting"); - createTask().subscribe(null, null, () -> logger.debug("Producer registration completed")); + checkRegistration() // + .filter(isRegisterred -> !isRegisterred) // + .flatMap(isRegisterred -> registerTypesAndProducer()) // + .subscribe( // + null, // + this::handleRegistrationFailure, // + this::handleRegistrationCompleted); } - public Mono createTask() { - return checkProducerRegistration() // - .doOnError(t -> isRegisteredInEcs = false) // - .onErrorResume(t -> registerTypesAndProducer()); + private void handleRegistrationCompleted() { + logger.debug("Registering types and producer succeeded"); + isRegisteredInEcs = true; } - public boolean isRegisteredInEcs() { - return this.isRegisteredInEcs; + private void handleRegistrationFailure(Throwable t) { + logger.warn("Registration failed {}", t.getMessage()); + isRegisteredInEcs = false; } - private Mono checkProducerRegistration() { + private Mono checkRegistration() { final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // - .flatMap(this::checkRegistrationInfo) // - ; + .flatMap(this::isRegisterredInfoCorrect) // + .onErrorResume(t -> Mono.just(Boolean.FALSE)); + } + + private Mono isRegisterredInfoCorrect(String registerredInfoStr) { + ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); + if (isEqual(producerRegistrationInfo(), registerredInfo)) { + logger.trace("Already registered"); + return Mono.just(Boolean.TRUE); + } else { + return Mono.just(Boolean.FALSE); + } } private String registerTypeUrl(InfoType type) { - String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); - return url; + return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } private Mono registerTypesAndProducer() { + final int CONCURRENCY = 20; final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // - .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) // + .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())), + CONCURRENCY) // .collectList() // - .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) // - .onErrorResume(t -> { - logger.warn("Registration failed {}", t.getMessage()); - isRegisteredInEcs = false; - return Mono.empty(); - }) // - .doOnNext(x -> logger.debug("Registering types and producer completed")); + .doOnNext(type -> logger.info("Registering producer")) // + .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))); } private Object typeSpecifcInfoObject() { @@ -138,17 +151,6 @@ public class ProducerRegstrationTask { } } - private Mono checkRegistrationInfo(String resp) { - ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class); - if (isEqual(producerRegistrationInfo(), info)) { - logger.debug("Already registered"); - this.isRegisteredInEcs = true; - return Mono.empty(); - } else { - return Mono.error(new ServiceException("Producer registration will be started")); - } - } - private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) { return a.jobCallbackUrl.equals(b.jobCallbackUrl) // && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) // @@ -160,7 +162,7 @@ public class ProducerRegstrationTask { return ProducerRegistrationInfo.builder() // .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) // .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) // - .supportedTypeIds(types.typeIds()) // + .supportedTypeIds(this.types.typeIds()) // .build(); } diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index cbaa59fa..b1c1780a 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -147,7 +147,6 @@ class ApplicationTest { this.consumerController.testResults.reset(); this.ecsSimulatorController.testResults.reset(); this.jobs.clear(); - this.types.clear(); } private AsyncRestClient restClient(boolean useTrustValidation) { @@ -240,7 +239,8 @@ class ApplicationTest { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); // Create a job this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); @@ -254,9 +254,32 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2)); assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL; + String jobs = restClient().get(jobUrl).block(); + assertThat(jobs).contains("ExampleInformationType"); + // Delete the job this.ecsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + + } + + @Test + void testReRegister() throws Exception { + // Wait foir register types and producer + await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + + // Clear the registration, should trigger a re-register + ecsSimulatorController.testResults.reset(); + await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + + // Just clear the registerred types, should trigger a re-register + ecsSimulatorController.testResults.types.clear(); + await().untilAsserted( + () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1)); + } private void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java index 1dbe83f7..4b6d9010 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -43,7 +43,7 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController("ConsumerSimulatorController") -@Tag(name = "Consts.PRODUCER_API_CALLBACKS_NAME") +@Tag(name = "Test Consumer Simulator (exists only in test)") public class ConsumerController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -64,7 +64,7 @@ public class ConsumerController { final TestResults testResults = new TestResults(); @PostMapping(path = CONSUMER_TARGET_URL, produces = MediaType.APPLICATION_JSON_VALUE) - @Operation(summary = "GET from topic", description = "The call is invoked to push data to consumer") + @Operation(summary = "Consume data", description = "The call is invoked to push data to consumer") @ApiResponses(value = { // @ApiResponse(responseCode = "200", description = "OK", // content = @Content(schema = @Schema(implementation = VoidResponse.class))) // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java index fbb600f2..5259ee1a 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -42,8 +42,8 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -@RestController("ProducerSimulatorController") -@Tag(name = "ProducerConsts.PRODUCER_API_CALLBACKS_NAME") +@RestController("DmaapSimulatorController") +@Tag(name = "DMAAP Simulator (exists only in test)") public class DmaapSimulatorController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java index 7542e0bd..828b027d 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java @@ -45,8 +45,8 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -@RestController("EcsSimulatorController") -@Tag(name = "EcsSimulator") +@RestController("IcsSimulatorController") +@Tag(name = "Information Coordinator Service Simulator (exists only in test)") public class EcsSimulatorController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -54,14 +54,16 @@ public class EcsSimulatorController { public static class TestResults { - ProducerRegistrationInfo registrationInfo; + ProducerRegistrationInfo registrationInfo = null; Map types = new HashMap<>(); + String infoProducerId = null; public TestResults() {} public void reset() { registrationInfo = null; types.clear(); + infoProducerId = null; } } @@ -86,6 +88,7 @@ public class EcsSimulatorController { @PathVariable("infoProducerId") String infoProducerId, // @RequestBody ProducerRegistrationInfo registrationInfo) { testResults.registrationInfo = registrationInfo; + testResults.infoProducerId = infoProducerId; return new ResponseEntity<>(HttpStatus.OK); } -- 2.16.6