Remove DMaaP Mediator Producer from repo 47/7847/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 1 Mar 2022 07:44:41 +0000 (08:44 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 1 Mar 2022 07:44:45 +0000 (08:44 +0100)
Issue-ID: NONRTRIC-714
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I47dd771ca54f68a61a31fd179cd8a1dfec702193

42 files changed:
dmaap-mediator-producer/.gitignore [deleted file]
dmaap-mediator-producer/Dockerfile [deleted file]
dmaap-mediator-producer/LICENSE.txt [deleted file]
dmaap-mediator-producer/README.md [deleted file]
dmaap-mediator-producer/api/docs.go [deleted file]
dmaap-mediator-producer/api/swagger.json [deleted file]
dmaap-mediator-producer/api/swagger.yaml [deleted file]
dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh [deleted file]
dmaap-mediator-producer/configs/typeSchemaDmaap.json [deleted file]
dmaap-mediator-producer/configs/typeSchemaKafka.json [deleted file]
dmaap-mediator-producer/configs/type_config.json [deleted file]
dmaap-mediator-producer/container-tag.yaml [deleted file]
dmaap-mediator-producer/generate_swagger_docs.sh [deleted file]
dmaap-mediator-producer/go.mod [deleted file]
dmaap-mediator-producer/go.sum [deleted file]
dmaap-mediator-producer/internal/config/config.go [deleted file]
dmaap-mediator-producer/internal/config/config_test.go [deleted file]
dmaap-mediator-producer/internal/config/registrator.go [deleted file]
dmaap-mediator-producer/internal/config/registrator_test.go [deleted file]
dmaap-mediator-producer/internal/jobs/jobs.go [deleted file]
dmaap-mediator-producer/internal/jobs/jobs_test.go [deleted file]
dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go [deleted file]
dmaap-mediator-producer/internal/restclient/HTTPClient.go [deleted file]
dmaap-mediator-producer/internal/restclient/HTTPClient_test.go [deleted file]
dmaap-mediator-producer/internal/server/server.go [deleted file]
dmaap-mediator-producer/internal/server/server_test.go [deleted file]
dmaap-mediator-producer/main.go [deleted file]
dmaap-mediator-producer/main_test.go [deleted file]
dmaap-mediator-producer/mocks/KafkaConsumer.go [deleted file]
dmaap-mediator-producer/mocks/KafkaFactory.go [deleted file]
dmaap-mediator-producer/mocks/httpclient/HTTPClient.go [deleted file]
dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go [deleted file]
dmaap-mediator-producer/security/producer.crt [deleted file]
dmaap-mediator-producer/security/producer.key [deleted file]
dmaap-mediator-producer/stub/consumer/consumerstub.go [deleted file]
dmaap-mediator-producer/stub/dmaap/mrstub.go [deleted file]
dmaap-mediator-producer/stub/ics/ics.go [deleted file]
docs/api-docs.rst
docs/conf.py
docs/developer-guide.rst
docs/overview.rst
docs/release-notes.rst

diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore
deleted file mode 100644 (file)
index aa6ce10..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-.history
-.vscode
-coverage.*
-main
-dmaapmediatorproducer
-__debug_bin*
-consumer
-!consumer/
-dmaap
-!dmaap/
-ics
-!ics/
diff --git a/dmaap-mediator-producer/Dockerfile b/dmaap-mediator-producer/Dockerfile
deleted file mode 100644 (file)
index 6d9b2b8..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-#==================================================================================
-#   Copyright (C) 2021: Nordix Foundation
-#
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
-#   platform project (RICP).
-#==================================================================================
-
-##
-## Build
-##
-FROM nexus3.o-ran-sc.org:10001/golang:1.17-bullseye AS build
-WORKDIR /app
-COPY go.mod .
-COPY go.sum .
-RUN go mod download
-COPY . .
-RUN go build -o /dmaapmediatorproducer
-##
-## Deploy
-##
-FROM gcr.io/distroless/base-debian11
-WORKDIR /
-## Copy from "build" stage
-COPY --from=build /dmaapmediatorproducer .
-COPY --from=build /app/configs/* /configs/
-COPY --from=build /app/security/* /security/
-USER nonroot:nonroot
-ENTRYPOINT ["/dmaapmediatorproducer"]
diff --git a/dmaap-mediator-producer/LICENSE.txt b/dmaap-mediator-producer/LICENSE.txt
deleted file mode 100644 (file)
index 96589bf..0000000
+++ /dev/null
@@ -1,201 +0,0 @@
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright [yyyy] [name of copyright owner]
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License. 
diff --git a/dmaap-mediator-producer/README.md b/dmaap-mediator-producer/README.md
deleted file mode 100644 (file)
index 6009a8f..0000000
+++ /dev/null
@@ -1,143 +0,0 @@
-# O-RAN-SC Non-RealTime RIC DMaaP Mediator Producer
-
-This product is a producer of Information Coordinator Service (ICS) jobs for polling topics in DMaaP Message Router (MR) and pushing the messages to a consumer.
-
-## Configuration
-
-The producer takes a number of environment variables, described below, as configuration.
-
->- INFO_PRODUCER_HOST  **Required**. The host for the producer.                                   Example: `https://mrproducer`
->- INFO_PRODUCER_PORT  Optional. The port for the product.                                        Defaults to `8085`.
->- INFO_COORD_ADDR     Optional. The address of the Information Coordinator.                      Defaults to `https://informationservice:8434`.
->- DMAAP_MR_ADDR       Optional. The address of the DMaaP Message Router.                         Defaults to `https://message-router.onap:3905`.
->- PRODUCER_CERT_PATH  Optional. The path to the certificate to use for https.                    Defaults to `security/producer.crt`
->- PRODUCER_KEY_PATH   Optional. The path to the key to the certificate to use for https.         Defaults to `security/producer.key`
->- LOG_LEVEL           Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`.  Defaults to `Info`.
-
-Any of the addresses used by this product can be configured to use https, by specifying it as the scheme of the address URI. Clients configured to use https will not use server certificate verification. The communication towards the consumers will use https if their callback address URI uses that scheme. The producer's own callback will only listen to the scheme configured in the scheme of the info producer host address.
-
-The configured public key and cerificate shall be PEM-encoded. A self signed certificate and key are provided in the `security` folder of the project. These files should be replaced for production. To generate a self signed key and certificate, use the example code below:
-
-    openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
-
-The file `configs/type_config.json` contains the configuration of job types that the producer will support, see example below.
-
-    {
-       "types":
-        [
-          {
-            "id": The ID of the job type, e.g. "STD_Fault_Messages",
-            "dmaapTopicUrl": The topic URL to poll from DMaaP Message Router, e.g. "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
-          },
-          {
-            "id": The ID of the job type, e.g. "Kafka_TestTopic",
-            "kafkaInputTopic": The Kafka topic to poll
-          }
-      ]
-    }
-
-Each information type has the following properties:
- - id the information type identity as exposed in the Information Coordination Service data consumer API
- - dmaapTopicUrl the URL to for fetching information from  DMaaP
- - kafkaInputTopic the Kafka topic to get input from
-
-Either the "dmaapTopicUrl" or the "kafkaInputTopic" must be provided for each type, not both.
-
-## Functionality
-
-At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
-
-Once the initial registration is done, the producer will constantly poll MR and/or Kafka for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
-
-The producer provides a REST API that fulfills the ICS Data producer API, see [Data producer (callbacks)](<https://docs.o-ran-sc.org/projects/o-ran-sc-nonrtric/en/latest/ics-api.html#tag/Data-producer-(callbacks)>). The health check method returns the registration status of the producer in ICS as JSON. It also provides a method to control the log level of the producer. The available log levels are the same as the ones used in the configuration above.
-
-    PUT https://mrproducer:8085/admin/log?level=<new level>
-
-The Swagger documentation of the producer's API is also available, through the `/swagger` path.
-
-When an Information Job is created in the Information Coordinator Service Consumer API, it is possible to define a number of job specific properties. For an Information type that has a Kafka topic defined, the following Json schema defines the properties that can be used:
-
-
-```sh
-{
-  "$schema": "http://json-schema.org/draft-04/schema#",
-  "type": "object",
-  "properties": {
-    "bufferTimeout": {
-      "type": "object",
-      "properties": {
-        "maxSize": {
-          "type": "integer"
-        },
-        "maxTimeMiliseconds": {
-          "type": "integer"
-        }
-      },
-      "additionalProperties": false,
-      "required": [
-        "maxSize",
-        "maxTimeMiliseconds"
-      ]
-    }
-  },
-  "additionalProperties": false
-}
-```
--bufferTimeout, can be used to reduce the number of REST calls to the consumer. If defined, a number of objects will be
- buffered and sent in one REST call to the consumer.
- The buffered objects will be put in a Json array and quoted. Example;
-   Object1 and Object2 may be posted in one call -->  ["Object1", "Object2"]
- The bufferTimeout is a Json object and the parameters in the object are:
-   - maxSize the maximum number of buffered objects before posting
-   - maxTimeMiliseconds the maximum delay time to buffer before posting
- If no bufferTimeout is specified, each object will be posted as received in separate calls (not quoted and put in a Json array).
-
-
-For an information type that only has a DMaaP topic, the following Json schema is used:
-
-```sh
-{
-  "$schema": "http://json-schema.org/draft-04/schema#",
-  "type": "object",
-  "properties": {
-  },
-  "additionalProperties": false
-}
-
-## Development
-
-To make it easy to test during development of the producer, three stubs are provided in the `stub` folder.
-
-One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
->1. cd stub/dmaap
->2. go build
->3. ./dmaap [-port \<PORT>]
-
-An ICS stub, under the `ics` folder, that listens for registration calls from the producer. When it gets a call it prints out the data of the call. By default, it listens to the port `8434`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
->1. cd stub/ics
->2. go build [-port \<PORT>]
->3. ./ics
-
-One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, if it is available, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
->1. cd stub/consumer
->2. go build
->3. ./consumer [-port \<PORT>]
-
-Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
-
-## License
-
-Copyright (C) 2021 Nordix Foundation.
-Licensed under the Apache License, Version 2.0 (the "License")
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-For more information about license please see the [LICENSE](LICENSE.txt) file for details.
diff --git a/dmaap-mediator-producer/api/docs.go b/dmaap-mediator-producer/api/docs.go
deleted file mode 100644 (file)
index dbfc42b..0000000
+++ /dev/null
@@ -1,303 +0,0 @@
-// Package api GENERATED BY THE COMMAND ABOVE; DO NOT EDIT
-// This file was generated by swaggo/swag
-package api
-
-import (
-       "bytes"
-       "encoding/json"
-       "strings"
-       "text/template"
-
-       "github.com/swaggo/swag"
-)
-
-var doc = `{
-    "schemes": {{ marshal .Schemes }},
-    "swagger": "2.0",
-    "info": {
-        "description": "{{escape .Description}}",
-        "title": "{{.Title}}",
-        "contact": {},
-        "license": {
-            "name": "Apache 2.0",
-            "url": "http://www.apache.org/licenses/LICENSE-2.0.html"
-        },
-        "version": "{{.Version}}"
-    },
-    "host": "{{.Host}}",
-    "basePath": "{{.BasePath}}",
-    "paths": {
-        "/admin/log": {
-            "put": {
-                "description": "Set the log level of the producer.",
-                "tags": [
-                    "Admin"
-                ],
-                "summary": "Set log level",
-                "parameters": [
-                    {
-                        "enum": [
-                            "Error",
-                            "Warn",
-                            "Info",
-                            "Debug"
-                        ],
-                        "type": "string",
-                        "description": "string enums",
-                        "name": "level",
-                        "in": "query"
-                    }
-                ],
-                "responses": {
-                    "200": {
-                        "description": ""
-                    },
-                    "400": {
-                        "description": "Problem as defined in https://tools.ietf.org/html/rfc7807",
-                        "schema": {
-                            "$ref": "#/definitions/ErrorInfo"
-                        },
-                        "headers": {
-                            "Content-Type": {
-                                "type": "string",
-                                "description": "application/problem+json"
-                            }
-                        }
-                    }
-                }
-            }
-        },
-        "/health_check": {
-            "get": {
-                "description": "Get the status of the producer. Will show if the producer has registered in ICS.",
-                "produces": [
-                    "application/json"
-                ],
-                "tags": [
-                    "Data producer (callbacks)"
-                ],
-                "summary": "Get status",
-                "responses": {
-                    "200": {
-                        "description": "OK",
-                        "schema": {
-                            "$ref": "#/definitions/"
-                        }
-                    }
-                }
-            }
-        },
-        "/info_job": {
-            "post": {
-                "description": "Callback for ICS to add an info job",
-                "consumes": [
-                    "application/json"
-                ],
-                "tags": [
-                    "Data producer (callbacks)"
-                ],
-                "summary": "Add info job",
-                "parameters": [
-                    {
-                        "description": "Info job data",
-                        "name": "user",
-                        "in": "body",
-                        "required": true,
-                        "schema": {
-                            "$ref": "#/definitions/JobInfo"
-                        }
-                    }
-                ],
-                "responses": {
-                    "200": {
-                        "description": ""
-                    },
-                    "400": {
-                        "description": "Problem as defined in https://tools.ietf.org/html/rfc7807",
-                        "schema": {
-                            "$ref": "#/definitions/ErrorInfo"
-                        },
-                        "headers": {
-                            "Content-Type": {
-                                "type": "string",
-                                "description": "application/problem+json"
-                            }
-                        }
-                    }
-                }
-            }
-        },
-        "/info_job/{infoJobId}": {
-            "delete": {
-                "description": "Callback for ICS to delete an info job",
-                "tags": [
-                    "Data producer (callbacks)"
-                ],
-                "summary": "Delete info job",
-                "parameters": [
-                    {
-                        "type": "string",
-                        "description": "Info job ID",
-                        "name": "infoJobId",
-                        "in": "path",
-                        "required": true
-                    }
-                ],
-                "responses": {
-                    "200": {
-                        "description": ""
-                    }
-                }
-            }
-        },
-        "/swagger": {
-            "get": {
-                "description": "Get the Swagger API documentation for the producer.",
-                "tags": [
-                    "Admin"
-                ],
-                "summary": "Get Swagger Documentation",
-                "responses": {
-                    "200": {
-                        "description": ""
-                    }
-                }
-            }
-        }
-    },
-    "definitions": {
-        "": {
-            "type": "object",
-            "properties": {
-                "registeredStatus": {
-                    "description": "The registration status of the producer in Information Coordinator Service. Either ` + "`" + `registered` + "`" + ` or ` + "`" + `not registered` + "`" + `",
-                    "type": "string",
-                    "example": "registered"
-                }
-            }
-        },
-        "BufferTimeout": {
-            "type": "object",
-            "properties": {
-                "maxSize": {
-                    "type": "integer"
-                },
-                "maxTimeMiliseconds": {
-                    "type": "integer"
-                }
-            }
-        },
-        "ErrorInfo": {
-            "type": "object",
-            "properties": {
-                "detail": {
-                    "description": "A human-readable explanation specific to this occurrence of the problem.",
-                    "type": "string",
-                    "example": "Info job type not found"
-                },
-                "instance": {
-                    "description": "A URI reference that identifies the specific occurrence of the problem.",
-                    "type": "string"
-                },
-                "status": {
-                    "description": "The HTTP status code generated by the origin server for this occurrence of the problem.",
-                    "type": "integer",
-                    "example": 400
-                },
-                "title": {
-                    "description": "A short, human-readable summary of the problem type.",
-                    "type": "string"
-                },
-                "type": {
-                    "description": "A URI reference that identifies the problem type.",
-                    "type": "string"
-                }
-            }
-        },
-        "JobInfo": {
-            "type": "object",
-            "properties": {
-                "info_job_data": {
-                    "$ref": "#/definitions/Parameters"
-                },
-                "info_job_identity": {
-                    "type": "string"
-                },
-                "info_type_identity": {
-                    "type": "string"
-                },
-                "last_updated": {
-                    "type": "string"
-                },
-                "owner": {
-                    "type": "string"
-                },
-                "target_uri": {
-                    "type": "string"
-                }
-            }
-        },
-        "Parameters": {
-            "type": "object",
-            "properties": {
-                "bufferTimeout": {
-                    "$ref": "#/definitions/BufferTimeout"
-                }
-            }
-        }
-    }
-}`
-
-type swaggerInfo struct {
-       Version     string
-       Host        string
-       BasePath    string
-       Schemes     []string
-       Title       string
-       Description string
-}
-
-// SwaggerInfo holds exported Swagger Info so clients can modify it
-var SwaggerInfo = swaggerInfo{
-       Version:     "1.1.0",
-       Host:        "",
-       BasePath:    "",
-       Schemes:     []string{},
-       Title:       "DMaaP Mediator Producer",
-       Description: "",
-}
-
-type s struct{}
-
-func (s *s) ReadDoc() string {
-       sInfo := SwaggerInfo
-       sInfo.Description = strings.Replace(sInfo.Description, "\n", "\\n", -1)
-
-       t, err := template.New("swagger_info").Funcs(template.FuncMap{
-               "marshal": func(v interface{}) string {
-                       a, _ := json.Marshal(v)
-                       return string(a)
-               },
-               "escape": func(v interface{}) string {
-                       // escape tabs
-                       str := strings.Replace(v.(string), "\t", "\\t", -1)
-                       // replace " with \", and if that results in \\", replace that with \\\"
-                       str = strings.Replace(str, "\"", "\\\"", -1)
-                       return strings.Replace(str, "\\\\\"", "\\\\\\\"", -1)
-               },
-       }).Parse(doc)
-       if err != nil {
-               return doc
-       }
-
-       var tpl bytes.Buffer
-       if err := t.Execute(&tpl, sInfo); err != nil {
-               return doc
-       }
-
-       return tpl.String()
-}
-
-func init() {
-       swag.Register("swagger", &s{})
-}
diff --git a/dmaap-mediator-producer/api/swagger.json b/dmaap-mediator-producer/api/swagger.json
deleted file mode 100644 (file)
index 8910022..0000000
+++ /dev/null
@@ -1,232 +0,0 @@
-{
-    "swagger": "2.0",
-    "info": {
-        "title": "DMaaP Mediator Producer",
-        "contact": {},
-        "license": {
-            "name": "Apache 2.0",
-            "url": "http://www.apache.org/licenses/LICENSE-2.0.html"
-        },
-        "version": "1.1.0"
-    },
-    "paths": {
-        "/admin/log": {
-            "put": {
-                "description": "Set the log level of the producer.",
-                "tags": [
-                    "Admin"
-                ],
-                "summary": "Set log level",
-                "parameters": [
-                    {
-                        "enum": [
-                            "Error",
-                            "Warn",
-                            "Info",
-                            "Debug"
-                        ],
-                        "type": "string",
-                        "description": "string enums",
-                        "name": "level",
-                        "in": "query"
-                    }
-                ],
-                "responses": {
-                    "200": {
-                        "description": ""
-                    },
-                    "400": {
-                        "description": "Problem as defined in https://tools.ietf.org/html/rfc7807",
-                        "schema": {
-                            "$ref": "#/definitions/ErrorInfo"
-                        },
-                        "headers": {
-                            "Content-Type": {
-                                "type": "string",
-                                "description": "application/problem+json"
-                            }
-                        }
-                    }
-                }
-            }
-        },
-        "/health_check": {
-            "get": {
-                "description": "Get the status of the producer. Will show if the producer has registered in ICS.",
-                "produces": [
-                    "application/json"
-                ],
-                "tags": [
-                    "Data producer (callbacks)"
-                ],
-                "summary": "Get status",
-                "responses": {
-                    "200": {
-                        "description": "OK",
-                        "schema": {
-                            "$ref": "#/definitions/"
-                        }
-                    }
-                }
-            }
-        },
-        "/info_job": {
-            "post": {
-                "description": "Callback for ICS to add an info job",
-                "consumes": [
-                    "application/json"
-                ],
-                "tags": [
-                    "Data producer (callbacks)"
-                ],
-                "summary": "Add info job",
-                "parameters": [
-                    {
-                        "description": "Info job data",
-                        "name": "user",
-                        "in": "body",
-                        "required": true,
-                        "schema": {
-                            "$ref": "#/definitions/JobInfo"
-                        }
-                    }
-                ],
-                "responses": {
-                    "200": {
-                        "description": ""
-                    },
-                    "400": {
-                        "description": "Problem as defined in https://tools.ietf.org/html/rfc7807",
-                        "schema": {
-                            "$ref": "#/definitions/ErrorInfo"
-                        },
-                        "headers": {
-                            "Content-Type": {
-                                "type": "string",
-                                "description": "application/problem+json"
-                            }
-                        }
-                    }
-                }
-            }
-        },
-        "/info_job/{infoJobId}": {
-            "delete": {
-                "description": "Callback for ICS to delete an info job",
-                "tags": [
-                    "Data producer (callbacks)"
-                ],
-                "summary": "Delete info job",
-                "parameters": [
-                    {
-                        "type": "string",
-                        "description": "Info job ID",
-                        "name": "infoJobId",
-                        "in": "path",
-                        "required": true
-                    }
-                ],
-                "responses": {
-                    "200": {
-                        "description": ""
-                    }
-                }
-            }
-        },
-        "/swagger": {
-            "get": {
-                "description": "Get the Swagger API documentation for the producer.",
-                "tags": [
-                    "Admin"
-                ],
-                "summary": "Get Swagger Documentation",
-                "responses": {
-                    "200": {
-                        "description": ""
-                    }
-                }
-            }
-        }
-    },
-    "definitions": {
-        "": {
-            "type": "object",
-            "properties": {
-                "registeredStatus": {
-                    "description": "The registration status of the producer in Information Coordinator Service. Either `registered` or `not registered`",
-                    "type": "string",
-                    "example": "registered"
-                }
-            }
-        },
-        "BufferTimeout": {
-            "type": "object",
-            "properties": {
-                "maxSize": {
-                    "type": "integer"
-                },
-                "maxTimeMiliseconds": {
-                    "type": "integer"
-                }
-            }
-        },
-        "ErrorInfo": {
-            "type": "object",
-            "properties": {
-                "detail": {
-                    "description": "A human-readable explanation specific to this occurrence of the problem.",
-                    "type": "string",
-                    "example": "Info job type not found"
-                },
-                "instance": {
-                    "description": "A URI reference that identifies the specific occurrence of the problem.",
-                    "type": "string"
-                },
-                "status": {
-                    "description": "The HTTP status code generated by the origin server for this occurrence of the problem.",
-                    "type": "integer",
-                    "example": 400
-                },
-                "title": {
-                    "description": "A short, human-readable summary of the problem type.",
-                    "type": "string"
-                },
-                "type": {
-                    "description": "A URI reference that identifies the problem type.",
-                    "type": "string"
-                }
-            }
-        },
-        "JobInfo": {
-            "type": "object",
-            "properties": {
-                "info_job_data": {
-                    "$ref": "#/definitions/Parameters"
-                },
-                "info_job_identity": {
-                    "type": "string"
-                },
-                "info_type_identity": {
-                    "type": "string"
-                },
-                "last_updated": {
-                    "type": "string"
-                },
-                "owner": {
-                    "type": "string"
-                },
-                "target_uri": {
-                    "type": "string"
-                }
-            }
-        },
-        "Parameters": {
-            "type": "object",
-            "properties": {
-                "bufferTimeout": {
-                    "$ref": "#/definitions/BufferTimeout"
-                }
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/api/swagger.yaml b/dmaap-mediator-producer/api/swagger.yaml
deleted file mode 100644 (file)
index adf70a8..0000000
+++ /dev/null
@@ -1,159 +0,0 @@
-definitions:
-  "":
-    properties:
-      registeredStatus:
-        description: The registration status of the producer in Information Coordinator
-          Service. Either `registered` or `not registered`
-        example: registered
-        type: string
-    type: object
-  BufferTimeout:
-    properties:
-      maxSize:
-        type: integer
-      maxTimeMiliseconds:
-        type: integer
-    type: object
-  ErrorInfo:
-    properties:
-      detail:
-        description: A human-readable explanation specific to this occurrence of the
-          problem.
-        example: Info job type not found
-        type: string
-      instance:
-        description: A URI reference that identifies the specific occurrence of the
-          problem.
-        type: string
-      status:
-        description: The HTTP status code generated by the origin server for this
-          occurrence of the problem.
-        example: 400
-        type: integer
-      title:
-        description: A short, human-readable summary of the problem type.
-        type: string
-      type:
-        description: A URI reference that identifies the problem type.
-        type: string
-    type: object
-  JobInfo:
-    properties:
-      info_job_data:
-        $ref: '#/definitions/Parameters'
-      info_job_identity:
-        type: string
-      info_type_identity:
-        type: string
-      last_updated:
-        type: string
-      owner:
-        type: string
-      target_uri:
-        type: string
-    type: object
-  Parameters:
-    properties:
-      bufferTimeout:
-        $ref: '#/definitions/BufferTimeout'
-    type: object
-info:
-  contact: {}
-  license:
-    name: Apache 2.0
-    url: http://www.apache.org/licenses/LICENSE-2.0.html
-  title: DMaaP Mediator Producer
-  version: 1.1.0
-paths:
-  /admin/log:
-    put:
-      description: Set the log level of the producer.
-      parameters:
-      - description: string enums
-        enum:
-        - Error
-        - Warn
-        - Info
-        - Debug
-        in: query
-        name: level
-        type: string
-      responses:
-        "200":
-          description: ""
-        "400":
-          description: Problem as defined in https://tools.ietf.org/html/rfc7807
-          headers:
-            Content-Type:
-              description: application/problem+json
-              type: string
-          schema:
-            $ref: '#/definitions/ErrorInfo'
-      summary: Set log level
-      tags:
-      - Admin
-  /health_check:
-    get:
-      description: Get the status of the producer. Will show if the producer has registered
-        in ICS.
-      produces:
-      - application/json
-      responses:
-        "200":
-          description: OK
-          schema:
-            $ref: '#/definitions/'
-      summary: Get status
-      tags:
-      - Data producer (callbacks)
-  /info_job:
-    post:
-      consumes:
-      - application/json
-      description: Callback for ICS to add an info job
-      parameters:
-      - description: Info job data
-        in: body
-        name: user
-        required: true
-        schema:
-          $ref: '#/definitions/JobInfo'
-      responses:
-        "200":
-          description: ""
-        "400":
-          description: Problem as defined in https://tools.ietf.org/html/rfc7807
-          headers:
-            Content-Type:
-              description: application/problem+json
-              type: string
-          schema:
-            $ref: '#/definitions/ErrorInfo'
-      summary: Add info job
-      tags:
-      - Data producer (callbacks)
-  /info_job/{infoJobId}:
-    delete:
-      description: Callback for ICS to delete an info job
-      parameters:
-      - description: Info job ID
-        in: path
-        name: infoJobId
-        required: true
-        type: string
-      responses:
-        "200":
-          description: ""
-      summary: Delete info job
-      tags:
-      - Data producer (callbacks)
-  /swagger:
-    get:
-      description: Get the Swagger API documentation for the producer.
-      responses:
-        "200":
-          description: ""
-      summary: Get Swagger Documentation
-      tags:
-      - Admin
-swagger: "2.0"
diff --git a/dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh b/dmaap-mediator-producer/build-dmaapmediatorproducer-ubuntu.sh
deleted file mode 100755 (executable)
index c846b1c..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/bin/bash
-##############################################################################
-#
-#   Copyright (C) 2021: Nordix Foundation
-#
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-##############################################################################
-set -eux
-
-echo "--> build-dmaapmediatorproducer-ubuntu.sh"
-curdir=`pwd`
-# go installs tools like go-acc to $HOME/go/bin
-# ubuntu minion path lacks go
-export PATH=$PATH:/usr/local/go/bin:$HOME/go/bin
-go version
-cd dmaap-mediator-producer
-
-# install the go coverage tool helper
-go get -v github.com/ory/go-acc
-
-export GO111MODULE=on
-go get github.com/stretchr/testify/mock@v1.7.0
-
-go-acc ./... --ignore mocks
-
-sed -i -e 's/oransc\.org\/nonrtric\/dmaapmediatorproducer/dmaap-mediator-producer/' coverage.txt
-
-cp coverage.txt $curdir
-echo "--> build-dmaapmediatorproducer-ubuntu.sh ends"
diff --git a/dmaap-mediator-producer/configs/typeSchemaDmaap.json b/dmaap-mediator-producer/configs/typeSchemaDmaap.json
deleted file mode 100644 (file)
index 4abee49..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-{
-  "$schema": "http://json-schema.org/draft-04/schema#",
-  "type": "object",
-  "properties": {
-  },
-  "additionalProperties": false
-}
diff --git a/dmaap-mediator-producer/configs/typeSchemaKafka.json b/dmaap-mediator-producer/configs/typeSchemaKafka.json
deleted file mode 100644 (file)
index 9c3980f..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-  "$schema": "http://json-schema.org/draft-04/schema#",
-  "type": "object",
-  "properties": {
-    "bufferTimeout": {
-      "type": "object",
-      "properties": {
-        "maxSize": {
-          "type": "integer"
-        },
-        "maxTimeMiliseconds": {
-          "type": "integer"
-        }
-      },
-      "additionalProperties": false,
-      "required": [
-        "maxSize",
-        "maxTimeMiliseconds"
-      ]
-    }
-  },
-  "additionalProperties": false
-}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json
deleted file mode 100644 (file)
index 1149669..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-  "types":
-    [
-      {
-        "id": "STD_Fault_Messages",
-        "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
-      },
-      {
-        "id": "Kafka_TestTopic",
-        "kafkaInputTopic": "TestTopic"
-      }
-  ]
-}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/container-tag.yaml b/dmaap-mediator-producer/container-tag.yaml
deleted file mode 100644 (file)
index f84eeb1..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-# The Jenkins job requires a tag to build the Docker image.
-# By default this file is in the docker build directory,
-# but the location can configured in the JJB template.
----
-tag: 1.1.0
diff --git a/dmaap-mediator-producer/generate_swagger_docs.sh b/dmaap-mediator-producer/generate_swagger_docs.sh
deleted file mode 100755 (executable)
index 8a13f30..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/bin/bash
-##############################################################################
-#
-#   Copyright (C) 2022: Nordix Foundation
-#
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-##############################################################################
-
-go get -u github.com/swaggo/swag/cmd/swag
-swag init --output api
-swag fmt
\ No newline at end of file
diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod
deleted file mode 100644 (file)
index ea7b361..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-module oransc.org/nonrtric/dmaapmediatorproducer
-
-go 1.17
-
-require (
-       github.com/confluentinc/confluent-kafka-go v1.8.2
-       github.com/gorilla/mux v1.8.0
-       github.com/hashicorp/go-retryablehttp v0.7.0
-       github.com/sirupsen/logrus v1.8.1
-       github.com/stretchr/testify v1.7.0
-       github.com/swaggo/http-swagger v1.1.2
-       github.com/swaggo/swag v1.7.8
-)
-
-require (
-       github.com/KyleBanks/depth v1.2.1 // indirect
-       github.com/PuerkitoBio/purell v1.1.1 // indirect
-       github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
-       github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
-       github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/ghodss/yaml v1.0.0 // indirect
-       github.com/go-openapi/jsonpointer v0.19.5 // indirect
-       github.com/go-openapi/jsonreference v0.19.6 // indirect
-       github.com/go-openapi/spec v0.20.4 // indirect
-       github.com/go-openapi/swag v0.19.15 // indirect
-       github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
-       github.com/josharian/intern v1.0.0 // indirect
-       github.com/mailru/easyjson v0.7.7 // indirect
-       github.com/pmezard/go-difflib v1.0.0 // indirect
-       github.com/russross/blackfriday/v2 v2.1.0 // indirect
-       github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
-       github.com/stretchr/objx v0.1.0 // indirect
-       github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 // indirect
-       github.com/urfave/cli/v2 v2.3.0 // indirect
-       golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
-       golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e // indirect
-       golang.org/x/text v0.3.7 // indirect
-       golang.org/x/tools v0.1.7 // indirect
-       gopkg.in/yaml.v2 v2.4.0 // indirect
-       gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
-)
diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum
deleted file mode 100644 (file)
index f7a6405..0000000
+++ /dev/null
@@ -1,137 +0,0 @@
-github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
-github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
-github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI=
-github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
-github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
-github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
-github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
-github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
-github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
-github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
-github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU=
-github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
-github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
-github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
-github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
-github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
-github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
-github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
-github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
-github.com/go-openapi/jsonreference v0.19.4/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg=
-github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg=
-github.com/go-openapi/jsonreference v0.19.6 h1:UBIxjkht+AWIgYzCDSv2GN+E/togfwXUJFRTWhl2Jjs=
-github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns=
-github.com/go-openapi/spec v0.19.14/go.mod h1:gwrgJS15eCUgjLpMjBJmbZezCsw88LmgeEip0M63doA=
-github.com/go-openapi/spec v0.20.0/go.mod h1:+81FIL1JwC5P3/Iuuozq3pPE9dXdIEGxFutcFKaVbmU=
-github.com/go-openapi/spec v0.20.4 h1:O8hJrt0UMnhHcluhIdUgCLRWyM2x7QkBXRvOs7m+O1M=
-github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I=
-github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
-github.com/go-openapi/swag v0.19.11/go.mod h1:Uc0gKkdR+ojzsEpjh39QChyu92vPgIr72POcgHMAgSY=
-github.com/go-openapi/swag v0.19.12/go.mod h1:eFdyEBkTdoAf/9RXBvj4cr1nH7GD8Kzo5HTt47gr72M=
-github.com/go-openapi/swag v0.19.15 h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM=
-github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
-github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
-github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
-github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
-github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
-github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
-github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
-github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
-github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
-github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
-github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
-github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
-github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
-github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
-github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
-github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
-github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
-github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
-github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
-github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
-github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
-github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
-github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
-github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
-github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
-github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
-github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
-github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
-github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
-github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
-github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
-github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
-github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14/go.mod h1:gxQT6pBGRuIGunNf/+tSOB5OHvguWi8Tbt82WOkf35E=
-github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 h1:+iNTcqQJy0OZ5jk6a5NLib47eqXK8uYcPX+O4+cBpEM=
-github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2/go.mod h1:lKJPbtWzJ9JhsTN1k1gZgleJWY/cqq0psdoMmaThG3w=
-github.com/swaggo/http-swagger v1.1.2 h1:ikcSD+EUOx+2oNZ2N6u8IYa8ScOsAvE7Jh+E1dW6i94=
-github.com/swaggo/http-swagger v1.1.2/go.mod h1:mX5nhypDmoSt4iw2mc5aKXxRFvp1CLLcCiog2B9M+Ro=
-github.com/swaggo/swag v1.7.0/go.mod h1:BdPIL73gvS9NBsdi7M1JOxLvlbfvNRaBP8m6WT6Aajo=
-github.com/swaggo/swag v1.7.8 h1:w249t0l/kc/DKMGlS0fppNJQxKyJ8heNaUWB6nsH3zc=
-github.com/swaggo/swag v1.7.8/go.mod h1:gZ+TJ2w/Ve1RwQsA2IRoSOTidHz6DX+PIG8GWvbnoLU=
-github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
-github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
-github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
-golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
-golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
-golang.org/x/net v0.0.0-20201207224615-747e23833adb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
-golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
-golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
-golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
-golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
-golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
-golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.0.0-20201120155355-20be4ac4bd6e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/tools v0.0.0-20201208062317-e652b2f42cc7/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
-golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
-golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
-gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
-gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
-gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
deleted file mode 100644 (file)
index 7582e9c..0000000
+++ /dev/null
@@ -1,133 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package config
-
-import (
-       "encoding/json"
-       "fmt"
-       "os"
-       "path/filepath"
-       "strconv"
-
-       log "github.com/sirupsen/logrus"
-)
-
-type Config struct {
-       LogLevel               log.Level
-       InfoProducerHost       string
-       InfoProducerPort       int
-       InfoCoordinatorAddress string
-       DMaaPMRAddress         string
-       KafkaBootstrapServers  string
-       ProducerCertPath       string
-       ProducerKeyPath        string
-}
-
-func New() *Config {
-       return &Config{
-               InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
-               InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
-               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
-               DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
-               KafkaBootstrapServers:  getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
-               ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
-               ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
-               LogLevel:               getLogLevel(),
-       }
-}
-
-func (c Config) String() string {
-       return fmt.Sprintf("InfoProducerHost: %v, InfoProducerPort: %v, InfoCoordinatorAddress: %v, DMaaPMRAddress: %v, ProducerCertPath: %v, ProducerKeyPath: %v, LogLevel: %v", c.InfoProducerHost, c.InfoProducerPort, c.InfoCoordinatorAddress, c.DMaaPMRAddress, c.ProducerCertPath, c.ProducerKeyPath, c.LogLevel)
-}
-func getEnv(key string, defaultVal string) string {
-       if value, exists := os.LookupEnv(key); exists {
-               return value
-       }
-
-       return defaultVal
-}
-
-func getEnvAsInt(name string, defaultVal int) int {
-       valueStr := getEnv(name, "")
-       if value, err := strconv.Atoi(valueStr); err == nil {
-               return value
-       } else if valueStr != "" {
-               log.Warnf("Invalid int value: %v for variable: %v. Default value: %v will be used", valueStr, name, defaultVal)
-       }
-
-       return defaultVal
-}
-
-func getLogLevel() log.Level {
-       logLevelStr := getEnv("LOG_LEVEL", "Info")
-       if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
-               return loglevel
-       } else {
-               log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr)
-               return log.InfoLevel
-       }
-}
-
-func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
-       typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
-       if err != nil {
-               return nil, err
-       }
-       typeDefs := struct {
-               Types []TypeDefinition `json:"types"`
-       }{}
-       err = json.Unmarshal(typeDefsByte, &typeDefs)
-       if err != nil {
-               return nil, err
-       }
-
-       kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
-       if err != nil {
-               return nil, err
-       }
-
-       dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
-       if err != nil {
-               return nil, err
-       }
-
-       for i, typeDef := range typeDefs.Types {
-               if typeDef.IsKafkaType() {
-                       typeDefs.Types[i].TypeSchema = kafkaTypeSchema
-               } else {
-                       typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
-               }
-       }
-       return typeDefs.Types, nil
-}
-
-func getTypeSchema(schemaFile string) (interface{}, error) {
-       typeDefsByte, err := os.ReadFile(schemaFile)
-       if err != nil {
-               return nil, err
-       }
-       var schema interface{}
-       err = json.Unmarshal(typeDefsByte, &schema)
-       if err != nil {
-               return nil, err
-       }
-       return schema, nil
-}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
deleted file mode 100644 (file)
index 0e081a8..0000000
+++ /dev/null
@@ -1,174 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package config
-
-import (
-       "bytes"
-       "encoding/json"
-       "os"
-       "path/filepath"
-       "testing"
-
-       log "github.com/sirupsen/logrus"
-       "github.com/stretchr/testify/require"
-)
-
-func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
-       assertions := require.New(t)
-       os.Setenv("LOG_LEVEL", "Debug")
-       os.Setenv("INFO_PRODUCER_HOST", "producerHost")
-       os.Setenv("INFO_PRODUCER_PORT", "8095")
-       os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
-       os.Setenv("DMAAP_MR_ADDR", "mrHost:3908")
-       os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9093")
-       os.Setenv("PRODUCER_CERT_PATH", "cert")
-       os.Setenv("PRODUCER_KEY_PATH", "key")
-       t.Cleanup(func() {
-               os.Clearenv()
-       })
-       wantConfig := Config{
-               LogLevel:               log.DebugLevel,
-               InfoProducerHost:       "producerHost",
-               InfoProducerPort:       8095,
-               InfoCoordinatorAddress: "infoCoordAddr",
-               DMaaPMRAddress:         "mrHost:3908",
-               KafkaBootstrapServers:  "localhost:9093",
-               ProducerCertPath:       "cert",
-               ProducerKeyPath:        "key",
-       }
-       got := New()
-
-       assertions.Equal(&wantConfig, got)
-}
-
-func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) {
-       assertions := require.New(t)
-       var buf bytes.Buffer
-       log.SetOutput(&buf)
-
-       os.Setenv("INFO_PRODUCER_PORT", "wrong")
-       t.Cleanup(func() {
-               log.SetOutput(os.Stderr)
-               os.Clearenv()
-       })
-       wantConfig := Config{
-               LogLevel:               log.InfoLevel,
-               InfoProducerHost:       "",
-               InfoProducerPort:       8085,
-               InfoCoordinatorAddress: "https://informationservice:8434",
-               DMaaPMRAddress:         "https://message-router.onap:3905",
-               KafkaBootstrapServers:  "localhost:9092",
-               ProducerCertPath:       "security/producer.crt",
-               ProducerKeyPath:        "security/producer.key",
-       }
-       got := New()
-       assertions.Equal(&wantConfig, got)
-       logString := buf.String()
-       assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
-}
-
-func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
-       assertions := require.New(t)
-       var buf bytes.Buffer
-       log.SetOutput(&buf)
-
-       os.Setenv("LOG_LEVEL", "wrong")
-       t.Cleanup(func() {
-               log.SetOutput(os.Stderr)
-               os.Clearenv()
-       })
-
-       wantConfig := Config{
-               LogLevel:               log.InfoLevel,
-               InfoProducerHost:       "",
-               InfoProducerPort:       8085,
-               InfoCoordinatorAddress: "https://informationservice:8434",
-               DMaaPMRAddress:         "https://message-router.onap:3905",
-               KafkaBootstrapServers:  "localhost:9092",
-               ProducerCertPath:       "security/producer.crt",
-               ProducerKeyPath:        "security/producer.key",
-       }
-
-       got := New()
-
-       assertions.Equal(&wantConfig, got)
-       logString := buf.String()
-       assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
-}
-
-func TestGetJobTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
-       assertions := require.New(t)
-       typesDir := CreateTypeConfigFiles(t)
-       t.Cleanup(func() {
-               os.RemoveAll(typesDir)
-       })
-
-       var typeSchemaObj interface{}
-       json.Unmarshal([]byte(typeSchemaFileContent), &typeSchemaObj)
-
-       types, err := GetJobTypesFromConfiguration(typesDir)
-
-       wantedDMaaPType := TypeDefinition{
-               Identity:      "type1",
-               DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
-               TypeSchema:    typeSchemaObj,
-       }
-       wantedKafkaType := TypeDefinition{
-               Identity:        "type2",
-               KafkaInputTopic: "TestTopic",
-               TypeSchema:      typeSchemaObj,
-       }
-       wantedTypes := []TypeDefinition{wantedDMaaPType, wantedKafkaType}
-       assertions.EqualValues(wantedTypes, types)
-       assertions.Nil(err)
-}
-
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}, {"id": "type2", "kafkaInputTopic": "TestTopic"}]}`
-const typeSchemaFileContent = `{
-       "$schema": "http://json-schema.org/draft-04/schema#",
-       "type": "object",
-       "properties": {
-         "filter": {
-                "type": "string"
-          }
-       },
-       "additionalProperties": false
-  }`
-
-func CreateTypeConfigFiles(t *testing.T) string {
-       typesDir, err := os.MkdirTemp("", "configs")
-       if err != nil {
-               t.Errorf("Unable to create temporary directory for types due to: %v", err)
-       }
-       fname := filepath.Join(typesDir, "type_config.json")
-       if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
-               t.Errorf("Unable to create temporary config file for types due to: %v", err)
-       }
-       fname = filepath.Join(typesDir, "typeSchemaDmaap.json")
-       if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
-               t.Errorf("Unable to create temporary schema file for DMaaP type due to: %v", err)
-       }
-       fname = filepath.Join(typesDir, "typeSchemaKafka.json")
-       if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
-               t.Errorf("Unable to create temporary schema file for Kafka type due to: %v", err)
-       }
-       return typesDir
-}
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
deleted file mode 100644 (file)
index 1dd0ad1..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package config
-
-import (
-       "encoding/json"
-       "fmt"
-       "net/url"
-
-       log "github.com/sirupsen/logrus"
-
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
-)
-
-const registerTypePath = "/data-producer/v1/info-types/"
-const registerProducerPath = "/data-producer/v1/info-producers/"
-
-type TypeDefinition struct {
-       Identity        string `json:"id"`
-       DMaaPTopicURL   string `json:"dmaapTopicUrl"`
-       KafkaInputTopic string `json:"kafkaInputTopic"`
-       TypeSchema      interface{}
-}
-
-func (td TypeDefinition) IsKafkaType() bool {
-       return td.KafkaInputTopic != ""
-}
-
-func (td TypeDefinition) IsDMaaPType() bool {
-       return td.DMaaPTopicURL != ""
-}
-
-type ProducerRegistrationInfo struct {
-       InfoProducerSupervisionCallbackUrl string   `json:"info_producer_supervision_callback_url"`
-       SupportedInfoTypes                 []string `json:"supported_info_types"`
-       InfoJobCallbackUrl                 string   `json:"info_job_callback_url"`
-}
-
-type Registrator interface {
-       RegisterTypes(types []TypeDefinition) error
-       RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
-}
-
-type RegistratorImpl struct {
-       infoCoordinatorAddress string
-       httpClient             restclient.HTTPClient
-}
-
-func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *RegistratorImpl {
-       return &RegistratorImpl{
-               infoCoordinatorAddress: infoCoordAddr,
-               httpClient:             client,
-       }
-}
-
-func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
-       for _, jobType := range jobTypes {
-               s, _ := json.Marshal(jobType.TypeSchema)
-               body := fmt.Sprintf(`{"info_job_data_schema": %v}`, string(s))
-               if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Identity), []byte(body), r.httpClient); error != nil {
-                       return error
-               }
-               log.Debugf("Registered type: %v", jobType)
-       }
-       return nil
-}
-
-func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
-       if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
-               if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body), r.httpClient); putErr != nil {
-                       return putErr
-               }
-               log.Debugf("Registered producer: %v", producerId)
-               return nil
-       } else {
-               return marshalErr
-       }
-}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
deleted file mode 100644 (file)
index b2f10cc..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package config
-
-import (
-       "encoding/json"
-       "io/ioutil"
-       "net/http"
-       "testing"
-
-       "github.com/stretchr/testify/mock"
-       "github.com/stretchr/testify/require"
-       "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
-)
-
-func TestRegisterTypes(t *testing.T) {
-       assertions := require.New(t)
-
-       clientMock := httpclient.HTTPClient{}
-
-       clientMock.On("Do", mock.Anything).Return(&http.Response{
-               StatusCode: http.StatusCreated,
-       }, nil)
-
-       schemaString := `{
-               "type": "object",
-               "properties": {},
-               "additionalProperties": false
-               }`
-       var schemaObj interface{}
-       json.Unmarshal([]byte(schemaString), &schemaObj)
-
-       type1 := TypeDefinition{
-               Identity:   "Type1",
-               TypeSchema: schemaObj,
-       }
-       types := []TypeDefinition{type1}
-
-       r := NewRegistratorImpl("http://localhost:9990", &clientMock)
-       err := r.RegisterTypes(types)
-
-       assertions.Nil(err)
-       var actualRequest *http.Request
-       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
-               actualRequest = req
-               return true
-       }))
-       assertions.Equal(http.MethodPut, actualRequest.Method)
-       assertions.Equal("http", actualRequest.URL.Scheme)
-       assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
-       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
-       body, _ := ioutil.ReadAll(actualRequest.Body)
-       expectedBody := []byte(`{"info_job_data_schema": {"additionalProperties":false,"properties":{},"type":"object"}}`)
-       assertions.Equal(expectedBody, body)
-       clientMock.AssertNumberOfCalls(t, "Do", 1)
-}
-
-func TestRegisterProducer(t *testing.T) {
-       assertions := require.New(t)
-
-       clientMock := httpclient.HTTPClient{}
-
-       clientMock.On("Do", mock.Anything).Return(&http.Response{
-               StatusCode: http.StatusCreated,
-       }, nil)
-
-       producer := ProducerRegistrationInfo{
-               InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
-               SupportedInfoTypes:                 []string{"type1"},
-               InfoJobCallbackUrl:                 "jobCallbackUrl",
-       }
-
-       r := NewRegistratorImpl("http://localhost:9990", &clientMock)
-       err := r.RegisterProducer("Producer1", &producer)
-
-       assertions.Nil(err)
-       var actualRequest *http.Request
-       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
-               actualRequest = req
-               return true
-       }))
-       assertions.Equal(http.MethodPut, actualRequest.Method)
-       assertions.Equal("http", actualRequest.URL.Scheme)
-       assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path)
-       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
-       body, _ := ioutil.ReadAll(actualRequest.Body)
-       expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`)
-       assertions.Equal(expectedBody, body)
-       clientMock.AssertNumberOfCalls(t, "Do", 1)
-}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
deleted file mode 100644 (file)
index 86bfe05..0000000
+++ /dev/null
@@ -1,461 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package jobs
-
-import (
-       "fmt"
-       "strings"
-       "sync"
-       "time"
-
-       "github.com/confluentinc/confluent-kafka-go/kafka"
-       log "github.com/sirupsen/logrus"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
-)
-
-type TypeData struct {
-       Identity    string `json:"id"`
-       jobsHandler *jobsHandler
-}
-
-type sourceType string
-
-const dMaaPSource = sourceType("dmaap")
-const kafkaSource = sourceType("kafka")
-
-type JobInfo struct {
-       Owner            string     `json:"owner"`
-       LastUpdated      string     `json:"last_updated"`
-       InfoJobIdentity  string     `json:"info_job_identity"`
-       TargetUri        string     `json:"target_uri"`
-       InfoJobData      Parameters `json:"info_job_data"`
-       InfoTypeIdentity string     `json:"info_type_identity"`
-       sourceType       sourceType
-} // @name JobInfo
-
-type JobTypesManager interface {
-       LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
-       GetSupportedTypes() []string
-}
-
-type JobsManager interface {
-       AddJobFromRESTCall(JobInfo) error
-       DeleteJobFromRESTCall(jobId string)
-}
-
-type JobsManagerImpl struct {
-       allTypes         map[string]TypeData
-       pollClient       restclient.HTTPClient
-       mrAddress        string
-       kafkaFactory     kafkaclient.KafkaFactory
-       distributeClient restclient.HTTPClient
-}
-
-func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
-       return &JobsManagerImpl{
-               allTypes:         make(map[string]TypeData),
-               pollClient:       pollClient,
-               mrAddress:        mrAddr,
-               kafkaFactory:     kafkaFactory,
-               distributeClient: distributeClient,
-       }
-}
-
-func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
-       if err := jm.validateJobInfo(ji); err == nil {
-               typeData := jm.allTypes[ji.InfoTypeIdentity]
-               ji.sourceType = typeData.jobsHandler.sourceType
-               typeData.jobsHandler.addJobCh <- ji
-               log.Debug("Added job: ", ji)
-               return nil
-       } else {
-               return err
-       }
-}
-
-func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
-       for _, typeData := range jm.allTypes {
-               log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
-               typeData.jobsHandler.deleteJobCh <- jobId
-       }
-       log.Debug("Deleted job: ", jobId)
-}
-
-func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
-       if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
-               return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
-       }
-       if ji.InfoJobIdentity == "" {
-               return fmt.Errorf("missing required job identity: %v", ji)
-       }
-       // Temporary for when there are only REST callbacks needed
-       if ji.TargetUri == "" {
-               return fmt.Errorf("missing required target URI: %v", ji)
-       }
-       return nil
-}
-
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
-       for _, typeDef := range types {
-               if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
-                       log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
-               }
-               jm.allTypes[typeDef.Identity] = TypeData{
-                       Identity:    typeDef.Identity,
-                       jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
-               }
-       }
-       return types
-}
-
-func (jm *JobsManagerImpl) GetSupportedTypes() []string {
-       supportedTypes := []string{}
-       for k := range jm.allTypes {
-               supportedTypes = append(supportedTypes, k)
-       }
-       return supportedTypes
-}
-
-func (jm *JobsManagerImpl) StartJobsForAllTypes() {
-       for _, jobType := range jm.allTypes {
-
-               go jobType.jobsHandler.startPollingAndDistribution()
-
-       }
-}
-
-type jobsHandler struct {
-       mu               sync.Mutex
-       typeId           string
-       sourceType       sourceType
-       pollingAgent     pollingAgent
-       jobs             map[string]job
-       addJobCh         chan JobInfo
-       deleteJobCh      chan string
-       distributeClient restclient.HTTPClient
-}
-
-func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
-       pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
-       sourceType := kafkaSource
-       if typeDef.DMaaPTopicURL != "" {
-               sourceType = dMaaPSource
-       }
-       return &jobsHandler{
-               typeId:           typeDef.Identity,
-               sourceType:       sourceType,
-               pollingAgent:     pollingAgent,
-               jobs:             make(map[string]job),
-               addJobCh:         make(chan JobInfo),
-               deleteJobCh:      make(chan string),
-               distributeClient: distributeClient,
-       }
-}
-
-func (jh *jobsHandler) startPollingAndDistribution() {
-       go func() {
-               for {
-                       jh.pollAndDistributeMessages()
-               }
-       }()
-
-       go func() {
-               for {
-                       jh.monitorManagementChannels()
-               }
-       }()
-}
-
-func (jh *jobsHandler) pollAndDistributeMessages() {
-       log.Debugf("Processing jobs for type: %v", jh.typeId)
-       messagesBody, error := jh.pollingAgent.pollMessages()
-       if error != nil {
-               log.Warn("Error getting data from source. Cause: ", error)
-               time.Sleep(time.Minute) // Must wait before trying to call data source again
-               return
-       }
-       jh.distributeMessages(messagesBody)
-}
-
-func (jh *jobsHandler) distributeMessages(messages []byte) {
-       if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
-               log.Debug("Distributing messages: ", string(messages))
-               jh.mu.Lock()
-               defer jh.mu.Unlock()
-               for _, job := range jh.jobs {
-                       if len(job.messagesChannel) < cap(job.messagesChannel) {
-                               job.messagesChannel <- messages
-                       } else {
-                               jh.emptyMessagesBuffer(job)
-                       }
-               }
-       }
-}
-
-func (jh *jobsHandler) emptyMessagesBuffer(job job) {
-       log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
-out:
-       for {
-               select {
-               case <-job.messagesChannel:
-               default:
-                       break out
-               }
-       }
-}
-
-func (jh *jobsHandler) monitorManagementChannels() {
-       select {
-       case addedJob := <-jh.addJobCh:
-               jh.addJob(addedJob)
-       case deletedJob := <-jh.deleteJobCh:
-               jh.deleteJob(deletedJob)
-       }
-}
-
-func (jh *jobsHandler) addJob(addedJob JobInfo) {
-       jh.mu.Lock()
-       log.Debug("Add job: ", addedJob)
-       newJob := newJob(addedJob, jh.distributeClient)
-       go newJob.start()
-       jh.jobs[addedJob.InfoJobIdentity] = newJob
-       jh.mu.Unlock()
-}
-
-func (jh *jobsHandler) deleteJob(deletedJob string) {
-       jh.mu.Lock()
-       log.Debug("Delete job: ", deletedJob)
-       j, exist := jh.jobs[deletedJob]
-       if exist {
-               j.controlChannel <- struct{}{}
-               delete(jh.jobs, deletedJob)
-       }
-       jh.mu.Unlock()
-}
-
-type pollingAgent interface {
-       pollMessages() ([]byte, error)
-}
-
-func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
-       if typeDef.DMaaPTopicURL != "" {
-               return dMaaPPollingAgent{
-                       messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
-                       pollClient:       pollClient,
-               }
-       } else {
-               return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
-       }
-}
-
-type dMaaPPollingAgent struct {
-       messageRouterURL string
-       pollClient       restclient.HTTPClient
-}
-
-func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
-       return restclient.Get(pa.messageRouterURL, pa.pollClient)
-}
-
-type kafkaPollingAgent struct {
-       kafkaClient kafkaclient.KafkaClient
-}
-
-func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
-       c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
-       if err != nil {
-               log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
-       }
-       return kafkaPollingAgent{
-               kafkaClient: c,
-       }
-}
-
-func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
-       msg, err := pa.kafkaClient.ReadMessage()
-       if err == nil {
-               return msg, nil
-       } else {
-               if isKafkaTimedOutError(err) {
-                       return []byte(""), nil
-               }
-               return nil, err
-       }
-}
-
-func isKafkaTimedOutError(err error) bool {
-       kafkaErr, ok := err.(kafka.Error)
-       return ok && kafkaErr.Code() == kafka.ErrTimedOut
-}
-
-type job struct {
-       jobInfo         JobInfo
-       client          restclient.HTTPClient
-       messagesChannel chan []byte
-       controlChannel  chan struct{}
-}
-
-func newJob(j JobInfo, c restclient.HTTPClient) job {
-
-       return job{
-               jobInfo:         j,
-               client:          c,
-               messagesChannel: make(chan []byte, 10),
-               controlChannel:  make(chan struct{}),
-       }
-}
-
-type Parameters struct {
-       BufferTimeout BufferTimeout `json:"bufferTimeout"`
-} // @name Parameters
-
-type BufferTimeout struct {
-       MaxSize            int   `json:"maxSize"`
-       MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
-} // @name BufferTimeout
-
-func (j *job) start() {
-       if j.isJobBuffered() {
-               j.startReadingMessagesBuffered()
-       } else {
-               j.startReadingSingleMessages()
-       }
-}
-
-func (j *job) startReadingSingleMessages() {
-out:
-       for {
-               select {
-               case <-j.controlChannel:
-                       log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
-                       break out
-               case msg := <-j.messagesChannel:
-                       j.sendMessagesToConsumer(msg)
-               }
-       }
-}
-
-func (j *job) startReadingMessagesBuffered() {
-out:
-       for {
-               select {
-               case <-j.controlChannel:
-                       log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
-                       break out
-               default:
-                       msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
-                       if len(msgs) > 0 {
-                               j.sendMessagesToConsumer(msgs)
-                       }
-               }
-       }
-}
-
-func (j *job) read(bufferParams BufferTimeout) []byte {
-       wg := sync.WaitGroup{}
-       wg.Add(bufferParams.MaxSize)
-       rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
-       c := make(chan struct{})
-       go func() {
-               i := 0
-       out:
-               for {
-                       select {
-                       case <-c:
-                               break out
-                       case msg := <-j.messagesChannel:
-                               rawMsgs = append(rawMsgs, msg)
-                               i++
-                               wg.Done()
-                               if i == bufferParams.MaxSize {
-                                       break out
-                               }
-                       }
-               }
-       }()
-       j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
-       close(c)
-       return getAsJSONArray(rawMsgs)
-}
-
-func getAsJSONArray(rawMsgs [][]byte) []byte {
-       if len(rawMsgs) == 0 {
-               return []byte("")
-       }
-       strings := ""
-       for i := 0; i < len(rawMsgs); i++ {
-               strings = strings + makeIntoString(rawMsgs[i])
-               strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
-       }
-       return []byte(wrapInJSONArray(strings))
-}
-
-func makeIntoString(rawMsg []byte) string {
-       return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
-}
-
-func addSeparatorIfNeeded(strings string, position, length int) string {
-       if position < length-1 {
-               strings = strings + ","
-       }
-       return strings
-}
-
-func wrapInJSONArray(strings string) string {
-       return "[" + strings + "]"
-}
-
-func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
-       c := make(chan struct{})
-       go func() {
-               defer close(c)
-               wg.Wait()
-       }()
-       select {
-       case <-c:
-               return false // completed normally
-       case <-time.After(timeout):
-               return true // timed out
-       }
-}
-
-func (j *job) sendMessagesToConsumer(messages []byte) {
-       log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
-       contentType := restclient.ContentTypeJSON
-       if j.isJobKafka() && !j.isJobBuffered() {
-               contentType = restclient.ContentTypePlain
-       }
-       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
-               log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
-               return
-       }
-       log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
-}
-
-func (j *job) isJobBuffered() bool {
-       return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
-}
-
-func (j *job) isJobKafka() bool {
-       return j.jobInfo.sourceType == kafkaSource
-}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
deleted file mode 100644 (file)
index 7d02104..0000000
+++ /dev/null
@@ -1,509 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package jobs
-
-import (
-       "bytes"
-       "fmt"
-       "io/ioutil"
-       "net/http"
-       "strconv"
-       "sync"
-       "testing"
-       "time"
-
-       "github.com/confluentinc/confluent-kafka-go/kafka"
-       "github.com/stretchr/testify/mock"
-       "github.com/stretchr/testify/require"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
-       "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
-)
-
-func TestJobsManagerLoadTypesFromConfiguration_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
-       assertions := require.New(t)
-
-       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
-
-       wantedDMaaPType := config.TypeDefinition{
-               Identity:      "type1",
-               DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
-       }
-       wantedKafkaType := config.TypeDefinition{
-               Identity:        "type2",
-               KafkaInputTopic: "topic",
-       }
-       wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
-
-       types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
-
-       assertions.EqualValues(wantedTypes, types)
-
-       supportedTypes := managerUnderTest.GetSupportedTypes()
-       assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
-       assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
-       assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
-}
-
-func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
-       assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
-       wantedJob := JobInfo{
-               Owner:            "owner",
-               LastUpdated:      "now",
-               InfoJobIdentity:  "job1",
-               TargetUri:        "target",
-               InfoJobData:      Parameters{},
-               InfoTypeIdentity: "type1",
-       }
-       jobsHandler := jobsHandler{
-               addJobCh: make(chan JobInfo)}
-       managerUnderTest.allTypes["type1"] = TypeData{
-               Identity:    "type1",
-               jobsHandler: &jobsHandler,
-       }
-
-       var err error
-       go func() {
-               err = managerUnderTest.AddJobFromRESTCall(wantedJob)
-       }()
-
-       assertions.Nil(err)
-       addedJob := <-jobsHandler.addJobCh
-       assertions.Equal(wantedJob, addedJob)
-}
-
-func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
-       assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
-       jobInfo := JobInfo{
-               InfoTypeIdentity: "type1",
-       }
-
-       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
-       assertions.NotNil(err)
-       assertions.Equal("type not supported: type1", err.Error())
-}
-
-func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
-       assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
-       managerUnderTest.allTypes["type1"] = TypeData{
-               Identity: "type1",
-       }
-
-       jobInfo := JobInfo{
-               InfoTypeIdentity: "type1",
-       }
-       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
-       assertions.NotNil(err)
-       assertions.Equal("missing required job identity: {    {{0 0}} type1 }", err.Error())
-}
-
-func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
-       assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
-       managerUnderTest.allTypes["type1"] = TypeData{
-               Identity: "type1",
-       }
-
-       jobInfo := JobInfo{
-               InfoTypeIdentity: "type1",
-               InfoJobIdentity:  "job1",
-       }
-       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
-       assertions.NotNil(err)
-       assertions.Equal("missing required target URI: {  job1  {{0 0}} type1 }", err.Error())
-}
-
-func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
-       assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
-       jobsHandler := jobsHandler{
-               deleteJobCh: make(chan string)}
-       managerUnderTest.allTypes["type1"] = TypeData{
-               Identity:    "type1",
-               jobsHandler: &jobsHandler,
-       }
-
-       go managerUnderTest.DeleteJobFromRESTCall("job2")
-
-       assertions.Equal("job2", <-jobsHandler.deleteJobCh)
-}
-
-func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
-       assertions := require.New(t)
-
-       called := false
-       dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
-       pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == "http://mrAddr/topicUrl" {
-                       assertions.Equal(req.Method, "GET")
-                       body := "[]"
-                       if !called {
-                               called = true
-                               body = dMaaPMessages
-                       }
-                       return &http.Response{
-                               StatusCode: http.StatusOK,
-                               Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               }
-               t.Error("Wrong call to client: ", req)
-               t.Fail()
-               return nil
-       })
-
-       wg := sync.WaitGroup{}
-       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == "http://consumerHost/dmaaptarget" {
-                       assertions.Equal(req.Method, "POST")
-                       assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               }
-               t.Error("Wrong call to client: ", req)
-               t.Fail()
-               return nil
-       })
-       dMaaPTypeDef := config.TypeDefinition{
-               Identity:      "type1",
-               DMaaPTopicURL: "/topicUrl",
-       }
-       dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
-
-       jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
-       jobsManager.allTypes["type1"] = TypeData{
-               Identity:    "type1",
-               jobsHandler: dMaaPJobsHandler,
-       }
-       jobsManager.StartJobsForAllTypes()
-
-       dMaaPJobInfo := JobInfo{
-               InfoTypeIdentity: "type1",
-               InfoJobIdentity:  "job1",
-               TargetUri:        "http://consumerHost/dmaaptarget",
-       }
-
-       wg.Add(1) // Wait till the distribution has happened
-       err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
-       assertions.Nil(err)
-
-       if waitTimeout(&wg, 2*time.Second) {
-               t.Error("Not all calls to server were made")
-               t.Fail()
-       }
-}
-
-func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
-       assertions := require.New(t)
-
-       kafkaMessages := `1`
-       wg := sync.WaitGroup{}
-       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == "http://consumerHost/kafkatarget" {
-                       assertions.Equal(req.Method, "POST")
-                       assertions.Equal(kafkaMessages, getBodyAsString(req, t))
-                       assertions.Equal("text/plain", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               }
-               t.Error("Wrong call to client: ", req)
-               t.Fail()
-               return nil
-       })
-
-       kafkaTypeDef := config.TypeDefinition{
-               Identity:        "type2",
-               KafkaInputTopic: "topic",
-       }
-       kafkaFactoryMock := mocks.KafkaFactory{}
-       kafkaConsumerMock := mocks.KafkaConsumer{}
-       kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
-       kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
-       kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
-               Value: []byte(kafkaMessages),
-       }, error(nil)).Once()
-       kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
-       kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
-       kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
-
-       jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
-       jobsManager.allTypes["type2"] = TypeData{
-               Identity:    "type2",
-               jobsHandler: kafkaJobsHandler,
-       }
-
-       jobsManager.StartJobsForAllTypes()
-
-       kafkaJobInfo := JobInfo{
-               InfoTypeIdentity: "type2",
-               InfoJobIdentity:  "job2",
-               TargetUri:        "http://consumerHost/kafkatarget",
-       }
-
-       wg.Add(1) // Wait till the distribution has happened
-       err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
-       assertions.Nil(err)
-
-       if waitTimeout(&wg, 2*time.Second) {
-               t.Error("Not all calls to server were made")
-               t.Fail()
-       }
-}
-
-func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
-       jobToDelete := newJob(JobInfo{}, nil)
-       go jobToDelete.start()
-       typeDef := config.TypeDefinition{
-               Identity:      "type1",
-               DMaaPTopicURL: "/topicUrl",
-       }
-       jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
-       jobsHandler.jobs["job1"] = jobToDelete
-
-       go jobsHandler.monitorManagementChannels()
-
-       jobsHandler.deleteJobCh <- "job1"
-
-       deleted := false
-       for i := 0; i < 100; i++ {
-               if len(jobsHandler.jobs) == 0 {
-                       deleted = true
-                       break
-               }
-               time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
-       }
-       require.New(t).True(deleted, "Job not deleted")
-}
-
-func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
-       job := newJob(JobInfo{
-               InfoJobIdentity: "job",
-       }, nil)
-
-       typeDef := config.TypeDefinition{
-               Identity:      "type1",
-               DMaaPTopicURL: "/topicUrl",
-       }
-       jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
-       jobsHandler.jobs["job1"] = job
-
-       fillMessagesBuffer(job.messagesChannel)
-
-       jobsHandler.distributeMessages([]byte("sent msg"))
-
-       require.New(t).Len(job.messagesChannel, 0)
-}
-
-func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
-       assertions := require.New(t)
-
-       kafkaFactoryMock := mocks.KafkaFactory{}
-       kafkaConsumerMock := mocks.KafkaConsumer{}
-       kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
-       kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
-       kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
-       kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
-
-       pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
-       messages, err := pollingAgentUnderTest.pollMessages()
-
-       assertions.Equal([]byte(""), messages)
-       assertions.Nil(err)
-}
-
-func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
-       assertions := require.New(t)
-
-       wg := sync.WaitGroup{}
-       messageNo := 1
-       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == "http://consumerHost/target" {
-                       assertions.Equal(req.Method, "POST")
-                       assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
-                       messageNo++
-                       assertions.Equal("text/plain", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               }
-               t.Error("Wrong call to client: ", req)
-               t.Fail()
-               return nil
-       })
-
-       jobUnderTest := newJob(JobInfo{
-               sourceType: kafkaSource,
-               TargetUri:  "http://consumerHost/target",
-       }, distributeClientMock)
-
-       wg.Add(2)
-       go jobUnderTest.start()
-
-       jobUnderTest.messagesChannel <- []byte("message1")
-       jobUnderTest.messagesChannel <- []byte("message2")
-
-       if waitTimeout(&wg, 2*time.Second) {
-               t.Error("Not all calls to server were made")
-               t.Fail()
-       }
-}
-
-func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
-       assertions := require.New(t)
-
-       wg := sync.WaitGroup{}
-       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == "http://consumerHost/target" {
-                       assertions.Equal(req.Method, "POST")
-                       assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t))
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               }
-               t.Error("Wrong call to client: ", req)
-               t.Fail()
-               return nil
-       })
-
-       jobUnderTest := newJob(JobInfo{
-               TargetUri: "http://consumerHost/target",
-               InfoJobData: Parameters{
-                       BufferTimeout: BufferTimeout{
-                               MaxSize:            5,
-                               MaxTimeMiliseconds: 200,
-                       },
-               },
-       }, distributeClientMock)
-
-       wg.Add(1)
-       go jobUnderTest.start()
-
-       go func() {
-               jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
-               jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
-               jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
-       }()
-
-       if waitTimeout(&wg, 2*time.Second) {
-               t.Error("Not all calls to server were made")
-               t.Fail()
-       }
-}
-
-func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
-       assertions := require.New(t)
-
-       jobUnderTest := newJob(JobInfo{}, nil)
-
-       go func() {
-               for i := 0; i < 4; i++ {
-                       jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
-               }
-       }()
-
-       msgs := jobUnderTest.read(BufferTimeout{
-               MaxSize:            2,
-               MaxTimeMiliseconds: 200,
-       })
-
-       assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
-}
-func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
-       assertions := require.New(t)
-
-       jobUnderTest := newJob(JobInfo{}, nil)
-
-       go func() {
-               for i := 0; i < 4; i++ {
-                       time.Sleep(10 * time.Millisecond)
-                       jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
-               }
-       }()
-
-       msgs := jobUnderTest.read(BufferTimeout{
-               MaxSize:            2,
-               MaxTimeMiliseconds: 30,
-       })
-
-       assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
-}
-
-func fillMessagesBuffer(mc chan []byte) {
-       for i := 0; i < cap(mc); i++ {
-               mc <- []byte("msg")
-       }
-}
-
-type RoundTripFunc func(req *http.Request) *http.Response
-
-func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
-       return f(req), nil
-}
-
-//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
-func NewTestClient(fn RoundTripFunc) *http.Client {
-       return &http.Client{
-               Transport: RoundTripFunc(fn),
-       }
-}
-
-// waitTimeout waits for the waitgroup for the specified max timeout.
-// Returns true if waiting timed out.
-func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
-       c := make(chan struct{})
-       go func() {
-               defer close(c)
-               wg.Wait()
-       }()
-       select {
-       case <-c:
-               return false // completed normally
-       case <-time.After(timeout):
-               return true // timed out
-       }
-}
-
-func getBodyAsString(req *http.Request, t *testing.T) string {
-       buf := new(bytes.Buffer)
-       if _, err := buf.ReadFrom(req.Body); err != nil {
-               t.Fail()
-       }
-       return buf.String()
-}
diff --git a/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
deleted file mode 100644 (file)
index 16abcb4..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package kafkaclient
-
-import (
-       "time"
-
-       "github.com/confluentinc/confluent-kafka-go/kafka"
-)
-
-type KafkaFactory interface {
-       NewKafkaConsumer(topicID string) (KafkaConsumer, error)
-}
-
-type KafkaFactoryImpl struct {
-       BootstrapServer string
-}
-
-func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
-       consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
-               "bootstrap.servers": kf.BootstrapServer,
-               "group.id":          "dmaap-mediator-producer",
-               "auto.offset.reset": "earliest",
-       })
-       if err != nil {
-               return nil, err
-       }
-       return KafkaConsumerImpl{consumer: consumer}, nil
-}
-
-func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
-       consumer, err := factory.NewKafkaConsumer(topicID)
-       if err != nil {
-               return KafkaClient{}, err
-       }
-       consumer.Commit()
-       err = consumer.Subscribe(topicID)
-       if err != nil {
-               return KafkaClient{}, err
-       }
-       return KafkaClient{consumer: consumer}, nil
-}
-
-type KafkaClient struct {
-       consumer KafkaConsumer
-}
-
-func (kc KafkaClient) ReadMessage() ([]byte, error) {
-       msg, err := kc.consumer.ReadMessage(time.Second)
-       if err != nil {
-               return nil, err
-       }
-       return msg.Value, nil
-}
-
-type KafkaConsumer interface {
-       Commit() ([]kafka.TopicPartition, error)
-       Subscribe(topic string) (err error)
-       ReadMessage(timeout time.Duration) (*kafka.Message, error)
-}
-
-type KafkaConsumerImpl struct {
-       consumer *kafka.Consumer
-}
-
-func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
-       return kc.consumer.Commit()
-}
-
-func (kc KafkaConsumerImpl) Subscribe(topic string) error {
-       return kc.consumer.Subscribe(topic, nil)
-}
-
-func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
-       return kc.consumer.ReadMessage(timeout)
-}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
deleted file mode 100644 (file)
index a7582c2..0000000
+++ /dev/null
@@ -1,177 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package restclient
-
-import (
-       "bytes"
-       "crypto/tls"
-       "fmt"
-       "io"
-       "math"
-       "net/http"
-       "net/url"
-       "time"
-
-       "github.com/hashicorp/go-retryablehttp"
-       log "github.com/sirupsen/logrus"
-)
-
-const ContentTypeJSON = "application/json"
-const ContentTypePlain = "text/plain"
-
-// HTTPClient interface
-type HTTPClient interface {
-       Get(url string) (*http.Response, error)
-
-       Do(*http.Request) (*http.Response, error)
-}
-
-type RequestError struct {
-       StatusCode int
-       Body       []byte
-}
-
-func (pe RequestError) Error() string {
-       return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
-}
-
-func Get(url string, client HTTPClient) ([]byte, error) {
-       if response, err := client.Get(url); err == nil {
-               if isResponseSuccess(response.StatusCode) {
-                       defer response.Body.Close()
-                       if responseData, err := io.ReadAll(response.Body); err == nil {
-                               return responseData, nil
-                       } else {
-                               return nil, err
-                       }
-               } else {
-                       return nil, getRequestError(response)
-               }
-       } else {
-               return nil, err
-       }
-}
-
-func Put(url string, body []byte, client HTTPClient) error {
-       return do(http.MethodPut, url, body, ContentTypeJSON, client)
-}
-
-func Post(url string, body []byte, contentType string, client HTTPClient) error {
-       return do(http.MethodPost, url, body, contentType, client)
-}
-
-func do(method string, url string, body []byte, contentType string, client HTTPClient) error {
-       if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
-               req.Header.Set("Content-Type", contentType)
-               if response, respErr := client.Do(req); respErr == nil {
-                       if isResponseSuccess(response.StatusCode) {
-                               return nil
-                       } else {
-                               return getRequestError(response)
-                       }
-               } else {
-                       return respErr
-               }
-       } else {
-               return reqErr
-       }
-}
-
-func isResponseSuccess(statusCode int) bool {
-       return statusCode >= http.StatusOK && statusCode <= 299
-}
-
-func getRequestError(response *http.Response) RequestError {
-       defer response.Body.Close()
-       responseData, _ := io.ReadAll(response.Body)
-       putError := RequestError{
-               StatusCode: response.StatusCode,
-               Body:       responseData,
-       }
-       return putError
-}
-
-func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate, error) {
-       if cert, err := tls.LoadX509KeyPair(certPath, keyPath); err == nil {
-               return cert, nil
-       } else {
-               return tls.Certificate{}, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", certPath, keyPath, err)
-       }
-}
-
-func CreateRetryClient(cert tls.Certificate) *http.Client {
-       rawRetryClient := retryablehttp.NewClient()
-       rawRetryClient.Logger = leveledLogger{}
-       rawRetryClient.RetryWaitMax = time.Minute
-       rawRetryClient.RetryMax = math.MaxInt
-       rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
-
-       client := rawRetryClient.StandardClient()
-       return client
-}
-
-func CreateClientWithoutRetry(cert tls.Certificate, timeout time.Duration) *http.Client {
-       return &http.Client{
-               Timeout:   timeout,
-               Transport: getSecureTransportWithoutVerify(cert),
-       }
-}
-
-func getSecureTransportWithoutVerify(cert tls.Certificate) *http.Transport {
-       return &http.Transport{
-               TLSClientConfig: &tls.Config{
-                       Certificates: []tls.Certificate{
-                               cert,
-                       },
-                       InsecureSkipVerify: true,
-               },
-       }
-}
-
-func IsUrlSecure(configUrl string) bool {
-       u, _ := url.Parse(configUrl)
-       return u.Scheme == "https"
-}
-
-// Used to get leveled logging in the RetryClient
-type leveledLogger struct {
-}
-
-func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) {
-       log.WithFields(getFields(keysAndValues)).Error(msg)
-}
-func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) {
-       log.WithFields(getFields(keysAndValues)).Info(msg)
-}
-func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) {
-       log.WithFields(getFields(keysAndValues)).Debug(msg)
-}
-func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
-       log.WithFields(getFields(keysAndValues)).Warn(msg)
-}
-
-func getFields(keysAndValues []interface{}) log.Fields {
-       fields := log.Fields{}
-       for i := 0; i < len(keysAndValues); i = i + 2 {
-               fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1]
-       }
-       return fields
-}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
deleted file mode 100644 (file)
index 90db6ae..0000000
+++ /dev/null
@@ -1,279 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package restclient
-
-import (
-       "bytes"
-       "crypto/tls"
-       "errors"
-       "fmt"
-       "io/ioutil"
-       "math"
-       "net/http"
-       "reflect"
-       "testing"
-       "time"
-
-       "github.com/hashicorp/go-retryablehttp"
-       "github.com/stretchr/testify/mock"
-       "github.com/stretchr/testify/require"
-       "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
-)
-
-func TestRequestError_Error(t *testing.T) {
-       assertions := require.New(t)
-       actualError := RequestError{
-               StatusCode: http.StatusBadRequest,
-               Body:       []byte("error"),
-       }
-       assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error())
-}
-func TestGet(t *testing.T) {
-       assertions := require.New(t)
-       type args struct {
-               url              string
-               mockReturnStatus int
-               mockReturnBody   string
-               mockReturnError  error
-       }
-       tests := []struct {
-               name        string
-               args        args
-               want        []byte
-               wantedError error
-       }{
-               {
-                       name: "Test Get with OK response",
-                       args: args{
-                               url:              "http://testOk",
-                               mockReturnStatus: http.StatusOK,
-                               mockReturnBody:   "Response",
-                       },
-                       want: []byte("Response"),
-               },
-               {
-                       name: "Test Get with Not OK response",
-                       args: args{
-                               url:              "http://testNotOk",
-                               mockReturnStatus: http.StatusBadRequest,
-                               mockReturnBody:   "Bad Response",
-                       },
-                       want: nil,
-                       wantedError: RequestError{
-                               StatusCode: http.StatusBadRequest,
-                               Body:       []byte("Bad Response"),
-                       },
-               },
-               {
-                       name: "Test Get with error",
-                       args: args{
-                               url:             "http://testError",
-                               mockReturnError: errors.New("Failed Request"),
-                       },
-                       want:        nil,
-                       wantedError: errors.New("Failed Request"),
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       clientMock := httpclient.HTTPClient{}
-                       clientMock.On("Get", tt.args.url).Return(&http.Response{
-                               StatusCode: tt.args.mockReturnStatus,
-                               Body:       ioutil.NopCloser(bytes.NewReader([]byte(tt.args.mockReturnBody))),
-                       }, tt.args.mockReturnError)
-
-                       got, err := Get(tt.args.url, &clientMock)
-                       assertions.Equal(tt.wantedError, err, tt.name)
-                       assertions.Equal(tt.want, got, tt.name)
-                       clientMock.AssertCalled(t, "Get", tt.args.url)
-               })
-       }
-}
-
-func TestPutOk(t *testing.T) {
-       assertions := require.New(t)
-       clientMock := httpclient.HTTPClient{}
-
-       clientMock.On("Do", mock.Anything).Return(&http.Response{
-               StatusCode: http.StatusOK,
-       }, nil)
-
-       if err := Put("http://localhost:9990", []byte("body"), &clientMock); err != nil {
-               t.Errorf("Put() error = %v, did not want error", err)
-       }
-       var actualRequest *http.Request
-       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
-               actualRequest = req
-               return true
-       }))
-       assertions.Equal(http.MethodPut, actualRequest.Method)
-       assertions.Equal("http", actualRequest.URL.Scheme)
-       assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
-       body, _ := ioutil.ReadAll(actualRequest.Body)
-       expectedBody := []byte("body")
-       assertions.Equal(expectedBody, body)
-       clientMock.AssertNumberOfCalls(t, "Do", 1)
-}
-
-func TestPostOk(t *testing.T) {
-       assertions := require.New(t)
-       clientMock := httpclient.HTTPClient{}
-
-       clientMock.On("Do", mock.Anything).Return(&http.Response{
-               StatusCode: http.StatusOK,
-       }, nil)
-
-       if err := Post("http://localhost:9990", []byte("body"), "application/json", &clientMock); err != nil {
-               t.Errorf("Put() error = %v, did not want error", err)
-       }
-       var actualRequest *http.Request
-       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
-               actualRequest = req
-               return true
-       }))
-       assertions.Equal(http.MethodPost, actualRequest.Method)
-       assertions.Equal("http", actualRequest.URL.Scheme)
-       assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
-       body, _ := ioutil.ReadAll(actualRequest.Body)
-       expectedBody := []byte("body")
-       assertions.Equal(expectedBody, body)
-       clientMock.AssertNumberOfCalls(t, "Do", 1)
-}
-
-func Test_doErrorCases(t *testing.T) {
-       assertions := require.New(t)
-       type args struct {
-               url              string
-               mockReturnStatus int
-               mockReturnBody   []byte
-               mockReturnError  error
-       }
-       tests := []struct {
-               name    string
-               args    args
-               wantErr error
-       }{
-               {
-                       name: "Bad request should get RequestError",
-                       args: args{
-                               url:              "badRequest",
-                               mockReturnStatus: http.StatusBadRequest,
-                               mockReturnBody:   []byte("bad request"),
-                               mockReturnError:  nil,
-                       },
-                       wantErr: RequestError{
-                               StatusCode: http.StatusBadRequest,
-                               Body:       []byte("bad request"),
-                       },
-               },
-               {
-                       name: "Server unavailable should get error",
-                       args: args{
-                               url:             "serverUnavailable",
-                               mockReturnError: fmt.Errorf("Server unavailable"),
-                       },
-                       wantErr: fmt.Errorf("Server unavailable"),
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       clientMock := httpclient.HTTPClient{}
-                       clientMock.On("Do", mock.Anything).Return(&http.Response{
-                               StatusCode: tt.args.mockReturnStatus,
-                               Body:       ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
-                       }, tt.args.mockReturnError)
-                       err := do("PUT", tt.args.url, nil, "", &clientMock)
-                       assertions.Equal(tt.wantErr, err, tt.name)
-               })
-       }
-}
-
-func Test_createClientCertificate(t *testing.T) {
-       assertions := require.New(t)
-       wantedCert, _ := tls.LoadX509KeyPair("../../security/producer.crt", "../../security/producer.key")
-       type args struct {
-               certPath string
-               keyPath  string
-       }
-       tests := []struct {
-               name     string
-               args     args
-               wantCert tls.Certificate
-               wantErr  error
-       }{
-               {
-                       name: "Paths to cert info ok should return cerftificate",
-                       args: args{
-                               certPath: "../../security/producer.crt",
-                               keyPath:  "../../security/producer.key",
-                       },
-                       wantCert: wantedCert,
-               },
-               {
-                       name: "Paths to cert info not ok should return error with info about error",
-                       args: args{
-                               certPath: "wrong_cert",
-                               keyPath:  "wrong_key",
-                       },
-                       wantErr: fmt.Errorf("cannot create x509 keypair from cert file wrong_cert and key file wrong_key due to: open wrong_cert: no such file or directory"),
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       cert, err := CreateClientCertificate(tt.args.certPath, tt.args.keyPath)
-                       assertions.Equal(tt.wantCert, cert, tt.name)
-                       assertions.Equal(tt.wantErr, err, tt.name)
-               })
-       }
-}
-
-func Test_CreateRetryClient(t *testing.T) {
-       assertions := require.New(t)
-
-       client := CreateRetryClient(tls.Certificate{})
-
-       transport := client.Transport
-       assertions.Equal("*retryablehttp.RoundTripper", reflect.TypeOf(transport).String())
-       retryableTransport := transport.(*retryablehttp.RoundTripper)
-       retryableClient := retryableTransport.Client
-       assertions.Equal(time.Minute, retryableClient.RetryWaitMax)
-       assertions.Equal(math.MaxInt, retryableClient.RetryMax)
-}
-
-func Test_CreateClientWithoutRetry(t *testing.T) {
-       assertions := require.New(t)
-
-       client := CreateClientWithoutRetry(tls.Certificate{}, 5*time.Second)
-
-       transport := client.Transport
-       assertions.Equal("*http.Transport", reflect.TypeOf(transport).String())
-       assertions.Equal(5*time.Second, client.Timeout)
-}
-
-func TestIsUrlSecured(t *testing.T) {
-       assertions := require.New(t)
-
-       assertions.True(IsUrlSecure("https://url"))
-
-       assertions.False(IsUrlSecure("http://url"))
-}
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
deleted file mode 100644 (file)
index 46bc2a2..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package server
-
-import (
-       "encoding/json"
-       "fmt"
-       "io/ioutil"
-       "net/http"
-
-       "github.com/gorilla/mux"
-       log "github.com/sirupsen/logrus"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-)
-
-const HealthCheckPath = "/health_check"
-const AddJobPath = "/info_job"
-const jobIdToken = "infoJobId"
-const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
-const logLevelToken = "level"
-const logAdminPath = "/admin/log"
-
-type ErrorInfo struct {
-       // A URI reference that identifies the problem type.
-       Type string `json:"type" swaggertype:"string"`
-       // A short, human-readable summary of the problem type.
-       Title string `json:"title" swaggertype:"string"`
-       // The HTTP status code generated by the origin server for this occurrence of the problem.
-       Status int `json:"status" swaggertype:"integer" example:"400"`
-       // A human-readable explanation specific to this occurrence of the problem.
-       Detail string `json:"detail" swaggertype:"string" example:"Info job type not found"`
-       // A URI reference that identifies the specific occurrence of the problem.
-       Instance string `json:"instance" swaggertype:"string"`
-} // @name ErrorInfo
-
-type ProducerCallbackHandler struct {
-       jobsManager jobs.JobsManager
-}
-
-func NewProducerCallbackHandler(jm jobs.JobsManager) *ProducerCallbackHandler {
-       return &ProducerCallbackHandler{
-               jobsManager: jm,
-       }
-}
-
-func NewRouter(jm jobs.JobsManager, hcf func(http.ResponseWriter, *http.Request)) *mux.Router {
-       callbackHandler := NewProducerCallbackHandler(jm)
-       r := mux.NewRouter()
-       r.HandleFunc(HealthCheckPath, hcf).Methods(http.MethodGet).Name("health_check")
-       r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
-       r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
-       r.HandleFunc(logAdminPath, callbackHandler.setLogLevel).Methods(http.MethodPut).Name("setLogLevel")
-       r.NotFoundHandler = &notFoundHandler{}
-       r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
-       return r
-}
-
-// @Summary      Add info job
-// @Description  Callback for ICS to add an info job
-// @Tags         Data producer (callbacks)
-// @Accept       json
-// @Param        user  body  jobs.JobInfo  true  "Info job data"
-// @Success      200
-// @Failure      400  {object}  ErrorInfo     "Problem as defined in https://tools.ietf.org/html/rfc7807"
-// @Header       400  {string}  Content-Type  "application/problem+json"
-// @Router       /info_job [post]
-func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *http.Request) {
-       b, readErr := ioutil.ReadAll(r.Body)
-       if readErr != nil {
-               returnError(fmt.Sprintf("Unable to read body due to: %v", readErr), w)
-               return
-       }
-       jobInfo := jobs.JobInfo{}
-       if unmarshalErr := json.Unmarshal(b, &jobInfo); unmarshalErr != nil {
-               returnError(fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), w)
-               return
-       }
-       if err := h.jobsManager.AddJobFromRESTCall(jobInfo); err != nil {
-               returnError(fmt.Sprintf("Invalid job info. Cause: %v", err), w)
-               return
-       }
-}
-
-// @Summary      Delete info job
-// @Description  Callback for ICS to delete an info job
-// @Tags         Data producer (callbacks)
-// @Param        infoJobId  path  string  true  "Info job ID"
-// @Success      200
-// @Router       /info_job/{infoJobId} [delete]
-func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) {
-       vars := mux.Vars(r)
-       id, ok := vars[jobIdToken]
-       if !ok {
-               http.Error(w, "Must provide infoJobId.", http.StatusBadRequest)
-               return
-       }
-
-       h.jobsManager.DeleteJobFromRESTCall(id)
-}
-
-// @Summary      Set log level
-// @Description  Set the log level of the producer.
-// @Tags         Admin
-// @Param        level  query  string  false  "string enums"  Enums(Error, Warn, Info, Debug)
-// @Success      200
-// @Failure      400  {object}  ErrorInfo     "Problem as defined in https://tools.ietf.org/html/rfc7807"
-// @Header       400  {string}  Content-Type  "application/problem+json"
-// @Router       /admin/log [put]
-func (h *ProducerCallbackHandler) setLogLevel(w http.ResponseWriter, r *http.Request) {
-       query := r.URL.Query()
-       logLevelStr := query.Get(logLevelToken)
-       if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
-               log.SetLevel(loglevel)
-       } else {
-               returnError(fmt.Sprintf("Invalid log level: %v. Log level will not be changed!", logLevelStr), w)
-               return
-       }
-}
-
-type notFoundHandler struct{}
-
-func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-       http.Error(w, "404 not found.", http.StatusNotFound)
-}
-
-type methodNotAllowedHandler struct{}
-
-func (h *methodNotAllowedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-       http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
-}
-
-func returnError(msg string, w http.ResponseWriter) {
-       errInfo := ErrorInfo{
-               Status: http.StatusBadRequest,
-               Detail: msg,
-       }
-       w.Header().Add("Content-Type", "application/problem+json")
-       w.WriteHeader(http.StatusBadRequest)
-       json.NewEncoder(w).Encode(errInfo)
-}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
deleted file mode 100644 (file)
index dbe503d..0000000
+++ /dev/null
@@ -1,257 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package server
-
-import (
-       "bytes"
-       "encoding/json"
-       "errors"
-       "io"
-       "io/ioutil"
-       "net/http"
-       "net/http/httptest"
-       "testing"
-
-       "github.com/gorilla/mux"
-       "github.com/stretchr/testify/mock"
-       "github.com/stretchr/testify/require"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-       "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobshandler"
-)
-
-func TestNewRouter(t *testing.T) {
-       assertions := require.New(t)
-
-       r := NewRouter(nil, nil)
-       statusRoute := r.Get("health_check")
-       assertions.NotNil(statusRoute)
-       supportedMethods, err := statusRoute.GetMethods()
-       assertions.Equal([]string{http.MethodGet}, supportedMethods)
-       assertions.Nil(err)
-       path, _ := statusRoute.GetPathTemplate()
-       assertions.Equal("/health_check", path)
-
-       addJobRoute := r.Get("add")
-       assertions.NotNil(addJobRoute)
-       supportedMethods, err = addJobRoute.GetMethods()
-       assertions.Equal([]string{http.MethodPost}, supportedMethods)
-       assertions.Nil(err)
-       path, _ = addJobRoute.GetPathTemplate()
-       assertions.Equal("/info_job", path)
-
-       deleteJobRoute := r.Get("delete")
-       assertions.NotNil(deleteJobRoute)
-       supportedMethods, err = deleteJobRoute.GetMethods()
-       assertions.Equal([]string{http.MethodDelete}, supportedMethods)
-       assertions.Nil(err)
-       path, _ = deleteJobRoute.GetPathTemplate()
-       assertions.Equal("/info_job/{infoJobId}", path)
-
-       notFoundHandler := r.NotFoundHandler
-       handler := http.HandlerFunc(notFoundHandler.ServeHTTP)
-       responseRecorder := httptest.NewRecorder()
-       handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t))
-       assertions.Equal(http.StatusNotFound, responseRecorder.Code)
-       assertions.Contains(responseRecorder.Body.String(), "404 not found.")
-
-       methodNotAllowedHandler := r.MethodNotAllowedHandler
-       handler = http.HandlerFunc(methodNotAllowedHandler.ServeHTTP)
-       responseRecorder = httptest.NewRecorder()
-       handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
-       assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
-       assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
-
-       setLogLevelRoute := r.Get("setLogLevel")
-       assertions.NotNil(setLogLevelRoute)
-       supportedMethods, err = setLogLevelRoute.GetMethods()
-       assertions.Equal([]string{http.MethodPut}, supportedMethods)
-       assertions.Nil(err)
-       path, _ = setLogLevelRoute.GetPathTemplate()
-       assertions.Equal("/admin/log", path)
-}
-
-func TestAddInfoJobToJobsHandler(t *testing.T) {
-       assertions := require.New(t)
-
-       type args struct {
-               job        jobs.JobInfo
-               mockReturn error
-       }
-       tests := []struct {
-               name            string
-               args            args
-               wantedStatus    int
-               wantedErrorInfo *ErrorInfo
-       }{
-               {
-                       name: "AddInfoJobToJobsHandler with correct job, should return OK",
-                       args: args{
-                               job: jobs.JobInfo{
-                                       Owner:            "owner",
-                                       LastUpdated:      "now",
-                                       InfoJobIdentity:  "jobId",
-                                       TargetUri:        "target",
-                                       InfoJobData:      jobs.Parameters{},
-                                       InfoTypeIdentity: "type",
-                               },
-                       },
-                       wantedStatus: http.StatusOK,
-               },
-               {
-                       name: "AddInfoJobToJobsHandler with incorrect job info, should return BadRequest",
-                       args: args{
-                               job: jobs.JobInfo{
-                                       Owner: "bad",
-                               },
-                               mockReturn: errors.New("error"),
-                       },
-                       wantedStatus: http.StatusBadRequest,
-                       wantedErrorInfo: &ErrorInfo{
-                               Status: http.StatusBadRequest,
-                               Detail: "Invalid job info. Cause: error",
-                       },
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       jobsHandlerMock := jobshandler.JobsHandler{}
-                       jobsHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
-
-                       callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
-
-                       handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
-                       responseRecorder := httptest.NewRecorder()
-                       r := newRequest(http.MethodPost, "/jobs", &tt.args.job, t)
-
-                       handler.ServeHTTP(responseRecorder, r)
-
-                       assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
-                       if tt.wantedErrorInfo != nil {
-                               var actualErrInfo ErrorInfo
-                               err := json.Unmarshal(getBody(responseRecorder, t), &actualErrInfo)
-                               if err != nil {
-                                       t.Error("Unable to unmarshal error body", err)
-                                       t.Fail()
-                               }
-                               assertions.Equal(*tt.wantedErrorInfo, actualErrInfo, tt.name)
-                               assertions.Equal("application/problem+json", responseRecorder.Result().Header.Get("Content-Type"))
-                       }
-                       jobsHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
-               })
-       }
-}
-
-func TestDeleteJob(t *testing.T) {
-       assertions := require.New(t)
-       jobsHandlerMock := jobshandler.JobsHandler{}
-       jobsHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
-
-       callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
-
-       responseRecorder := httptest.NewRecorder()
-       r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
-       handler := http.HandlerFunc(callbackHandlerUnderTest.deleteInfoJobHandler)
-       handler.ServeHTTP(responseRecorder, r)
-       assertions.Equal(http.StatusOK, responseRecorder.Result().StatusCode)
-
-       assertions.Equal("", responseRecorder.Body.String())
-
-       jobsHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
-}
-
-func TestSetLogLevel(t *testing.T) {
-       assertions := require.New(t)
-
-       type args struct {
-               logLevel string
-       }
-       tests := []struct {
-               name            string
-               args            args
-               wantedStatus    int
-               wantedErrorInfo *ErrorInfo
-       }{
-               {
-                       name: "Set to valid log level, should return OK",
-                       args: args{
-                               logLevel: "Debug",
-                       },
-                       wantedStatus: http.StatusOK,
-               },
-               {
-                       name: "Set to invalid log level, should return BadRequest",
-                       args: args{
-                               logLevel: "bad",
-                       },
-                       wantedStatus: http.StatusBadRequest,
-                       wantedErrorInfo: &ErrorInfo{
-                               Detail: "Invalid log level: bad. Log level will not be changed!",
-                               Status: http.StatusBadRequest,
-                       },
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       callbackHandlerUnderTest := NewProducerCallbackHandler(nil)
-
-                       handler := http.HandlerFunc(callbackHandlerUnderTest.setLogLevel)
-                       responseRecorder := httptest.NewRecorder()
-                       r, _ := http.NewRequest(http.MethodPut, "/admin/log?level="+tt.args.logLevel, nil)
-
-                       handler.ServeHTTP(responseRecorder, r)
-
-                       assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
-                       if tt.wantedErrorInfo != nil {
-                               var actualErrInfo ErrorInfo
-                               err := json.Unmarshal(getBody(responseRecorder, t), &actualErrInfo)
-                               if err != nil {
-                                       t.Error("Unable to unmarshal error body", err)
-                                       t.Fail()
-                               }
-                               assertions.Equal(*tt.wantedErrorInfo, actualErrInfo, tt.name)
-                               assertions.Equal("application/problem+json", responseRecorder.Result().Header.Get("Content-Type"))
-                       }
-               })
-       }
-}
-
-func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
-       var body io.Reader
-       if jobInfo != nil {
-               bodyAsBytes, _ := json.Marshal(jobInfo)
-               body = ioutil.NopCloser(bytes.NewReader(bodyAsBytes))
-       }
-       if req, err := http.NewRequest(method, url, body); err == nil {
-               return req
-       } else {
-               t.Fatalf("Could not create request due to: %v", err)
-               return nil
-       }
-}
-
-func getBody(responseRecorder *httptest.ResponseRecorder, t *testing.T) []byte {
-       buf := new(bytes.Buffer)
-       if _, err := buf.ReadFrom(responseRecorder.Body); err != nil {
-               t.Error("Unable to read error body", err)
-               t.Fail()
-       }
-       return buf.Bytes()
-}
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
deleted file mode 100644 (file)
index 65a84a2..0000000
+++ /dev/null
@@ -1,167 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package main
-
-import (
-       "crypto/tls"
-       "encoding/json"
-       "fmt"
-       "net/http"
-       "time"
-
-       "github.com/gorilla/mux"
-       log "github.com/sirupsen/logrus"
-       _ "oransc.org/nonrtric/dmaapmediatorproducer/api"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
-
-       httpSwagger "github.com/swaggo/http-swagger"
-)
-
-var configuration *config.Config
-var registered bool
-
-func init() {
-       configuration = config.New()
-}
-
-// @title    DMaaP Mediator Producer
-// @version  1.1.0
-
-// @license.name  Apache 2.0
-// @license.url   http://www.apache.org/licenses/LICENSE-2.0.html
-
-func main() {
-       log.SetLevel(configuration.LogLevel)
-       log.Debug("Initializing DMaaP Mediator Producer")
-       log.Debug("Using configuration: ", configuration)
-       if err := validateConfiguration(configuration); err != nil {
-               log.Fatalf("Stopping producer due to error: %v", err)
-       }
-       callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
-
-       var cert tls.Certificate
-       if c, err := restclient.CreateClientCertificate(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
-               cert = c
-       } else {
-               log.Fatalf("Stopping producer due to error: %v", err)
-       }
-
-       retryClient := restclient.CreateRetryClient(cert)
-       kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers}
-       distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second)
-
-       jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient)
-       go startCallbackServer(jobsManager, callbackAddress)
-
-       if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
-               log.Fatalf("Stopping producer due to: %v", err)
-       }
-       registered = true
-       jobsManager.StartJobsForAllTypes()
-
-       log.Debug("Starting DMaaP Mediator Producer")
-
-       keepProducerAlive()
-}
-
-func validateConfiguration(configuration *config.Config) error {
-       if configuration.InfoProducerHost == "" {
-               return fmt.Errorf("missing INFO_PRODUCER_HOST")
-       }
-       if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
-               return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
-       }
-       if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" {
-               return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided")
-       }
-       return nil
-}
-func registerTypesAndProducer(jobTypesManager jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
-       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
-       configTypes, err := config.GetJobTypesFromConfiguration("configs")
-       if err != nil {
-               return fmt.Errorf("unable to register all types due to: %v", err)
-       }
-       regErr := registrator.RegisterTypes(jobTypesManager.LoadTypesFromConfiguration(configTypes))
-       if regErr != nil {
-               return fmt.Errorf("unable to register all types due to: %v", regErr)
-       }
-
-       producer := config.ProducerRegistrationInfo{
-               InfoProducerSupervisionCallbackUrl: callbackAddress + server.HealthCheckPath,
-               SupportedInfoTypes:                 jobTypesManager.GetSupportedTypes(),
-               InfoJobCallbackUrl:                 callbackAddress + server.AddJobPath,
-       }
-       if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
-               return fmt.Errorf("unable to register producer due to: %v", err)
-       }
-       return nil
-}
-
-func startCallbackServer(jobsManager jobs.JobsManager, callbackAddress string) {
-       log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
-       r := server.NewRouter(jobsManager, statusHandler)
-       addSwaggerHandler(r)
-       if restclient.IsUrlSecure(callbackAddress) {
-               log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
-       } else {
-               log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
-       }
-}
-
-type ProducerStatus struct {
-       // The registration status of the producer in Information Coordinator Service. Either `registered` or `not registered`
-       RegisteredStatus string `json:"registeredStatus" swaggertype:"string" example:"registered"`
-} // @name  ProducerStatus
-
-// @Summary      Get status
-// @Description  Get the status of the producer. Will show if the producer has registered in ICS.
-// @Tags         Data producer (callbacks)
-// @Produce      json
-// @Success      200  {object}  ProducerStatus
-// @Router       /health_check [get]
-func statusHandler(w http.ResponseWriter, r *http.Request) {
-       status := ProducerStatus{
-               RegisteredStatus: "not registered",
-       }
-       if registered {
-               status.RegisteredStatus = "registered"
-       }
-       json.NewEncoder(w).Encode(status)
-}
-
-// @Summary      Get Swagger Documentation
-// @Description  Get the Swagger API documentation for the producer.
-// @Tags         Admin
-// @Success      200
-// @Router       /swagger [get]
-func addSwaggerHandler(r *mux.Router) {
-       r.PathPrefix("/swagger").Handler(httpSwagger.WrapHandler)
-}
-
-func keepProducerAlive() {
-       forever := make(chan int)
-       <-forever
-}
diff --git a/dmaap-mediator-producer/main_test.go b/dmaap-mediator-producer/main_test.go
deleted file mode 100644 (file)
index 19851be..0000000
+++ /dev/null
@@ -1,205 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2022: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package main
-
-import (
-       "bytes"
-       "fmt"
-       "io/ioutil"
-       "net/http"
-       "os/exec"
-       "sync"
-       "testing"
-       "time"
-
-       "github.com/stretchr/testify/require"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
-)
-
-// This is not a real test, just a way to get the Swagger documentation generated automatically.
-// Hence there are no assertions in this test.
-func TestGenerateSwaggerDocs(t *testing.T) {
-       cmd := exec.Command("./generate_swagger_docs.sh")
-
-       err := cmd.Run()
-       if err != nil {
-               fmt.Println("Error generating Swagger:", err)
-       }
-}
-
-func TestValidateConfiguration(t *testing.T) {
-       assertions := require.New(t)
-
-       validConfig := config.Config{
-               InfoProducerHost:      "host",
-               DMaaPMRAddress:        "address",
-               KafkaBootstrapServers: "servers",
-               ProducerCertPath:      "path",
-               ProducerKeyPath:       "path",
-       }
-       assertions.Nil(validateConfiguration(&validConfig))
-
-       missingProducerHost := config.Config{
-               DMaaPMRAddress:        "address",
-               KafkaBootstrapServers: "servers",
-               ProducerCertPath:      "path",
-               ProducerKeyPath:       "path",
-       }
-       assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
-
-       missingCert := config.Config{
-               InfoProducerHost:      "host",
-               DMaaPMRAddress:        "address",
-               KafkaBootstrapServers: "servers",
-               ProducerKeyPath:       "path",
-       }
-       assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
-
-       missingCertKey := config.Config{
-               InfoProducerHost:      "host",
-               DMaaPMRAddress:        "address",
-               KafkaBootstrapServers: "servers",
-               ProducerCertPath:      "path",
-       }
-       assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
-
-       missingMRAddress := config.Config{
-               InfoProducerHost:      "host",
-               KafkaBootstrapServers: "servers",
-               ProducerCertPath:      "path",
-               ProducerKeyPath:       "path",
-       }
-       assertions.Nil(validateConfiguration(&missingMRAddress))
-
-       missingKafkaServers := config.Config{
-               InfoProducerHost: "host",
-               DMaaPMRAddress:   "address",
-               ProducerCertPath: "path",
-               ProducerKeyPath:  "path",
-       }
-       assertions.Nil(validateConfiguration(&missingKafkaServers))
-
-       missingMRAddressdAndKafkaServers := config.Config{
-               InfoProducerHost: "host",
-               ProducerCertPath: "path",
-               ProducerKeyPath:  "path",
-       }
-       assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
-       assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
-}
-
-func TestRegisterTypesAndProducer(t *testing.T) {
-       assertions := require.New(t)
-
-       wg := sync.WaitGroup{}
-       clientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
-                       assertions.Equal(req.Method, "PUT")
-                       body := getBodyAsString(req, t)
-                       assertions.Contains(body, "info_job_data_schema")
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
-                       assertions.Equal(req.Method, "PUT")
-                       body := getBodyAsString(req, t)
-                       assertions.Contains(body, "info_job_data_schema")
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
-                       assertions.Equal(req.Method, "PUT")
-                       body := getBodyAsString(req, t)
-                       assertions.Contains(body, "callbackAddress/health_check")
-                       assertions.Contains(body, "callbackAddress/info_job")
-                       assertions.Contains(body, "Kafka_TestTopic")
-                       assertions.Contains(body, "STD_Fault_Messages")
-                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done()
-                       return &http.Response{
-                               StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
-                               Header:     make(http.Header), // Must be set to non-nil value or it panics
-                       }
-               }
-               t.Error("Wrong call to client: ", req)
-               t.Fail()
-               return nil
-       })
-       jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
-
-       wg.Add(3)
-       err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
-
-       assertions.Nil(err)
-
-       if waitTimeout(&wg, 2*time.Second) {
-               t.Error("Not all calls to server were made")
-               t.Fail()
-       }
-}
-
-type RoundTripFunc func(req *http.Request) *http.Response
-
-func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
-       return f(req), nil
-}
-
-//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
-func NewTestClient(fn RoundTripFunc) *http.Client {
-       return &http.Client{
-               Transport: RoundTripFunc(fn),
-       }
-}
-
-func getBodyAsString(req *http.Request, t *testing.T) string {
-       buf := new(bytes.Buffer)
-       if _, err := buf.ReadFrom(req.Body); err != nil {
-               t.Fail()
-       }
-       return buf.String()
-}
-
-// waitTimeout waits for the waitgroup for the specified max timeout.
-// Returns true if waiting timed out.
-func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
-       c := make(chan struct{})
-       go func() {
-               defer close(c)
-               wg.Wait()
-       }()
-       select {
-       case <-c:
-               return false // completed normally
-       case <-time.After(timeout):
-               return true // timed out
-       }
-}
diff --git a/dmaap-mediator-producer/mocks/KafkaConsumer.go b/dmaap-mediator-producer/mocks/KafkaConsumer.go
deleted file mode 100644 (file)
index 8ae0893..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-// Code generated by mockery v1.0.0. DO NOT EDIT.
-
-package mocks
-
-import (
-       kafka "github.com/confluentinc/confluent-kafka-go/kafka"
-
-       mock "github.com/stretchr/testify/mock"
-
-       time "time"
-)
-
-// KafkaConsumer is an autogenerated mock type for the KafkaConsumer type
-type KafkaConsumer struct {
-       mock.Mock
-}
-
-// Commit provides a mock function with given fields:
-func (_m KafkaConsumer) Commit() ([]kafka.TopicPartition, error) {
-       ret := _m.Called()
-
-       var r0 []kafka.TopicPartition
-       if rf, ok := ret.Get(0).(func() []kafka.TopicPartition); ok {
-               r0 = rf()
-       } else {
-               if ret.Get(0) != nil {
-                       r0 = ret.Get(0).([]kafka.TopicPartition)
-               }
-       }
-
-       var r1 error
-       if rf, ok := ret.Get(1).(func() error); ok {
-               r1 = rf()
-       } else {
-               r1 = ret.Error(1)
-       }
-
-       return r0, r1
-}
-
-// ReadMessage provides a mock function with given fields: timeout
-func (_m KafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
-       ret := _m.Called(timeout)
-
-       var r0 *kafka.Message
-       if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok {
-               r0 = rf(timeout)
-       } else {
-               if ret.Get(0) != nil {
-                       r0 = ret.Get(0).(*kafka.Message)
-               }
-       }
-
-       var r1 error
-       if rf, ok := ret.Get(1).(func(time.Duration) error); ok {
-               r1 = rf(timeout)
-       } else {
-               r1 = ret.Error(1)
-       }
-
-       return r0, r1
-}
-
-// Subscribe provides a mock function with given fields: topic
-func (_m KafkaConsumer) Subscribe(topic string) error {
-       ret := _m.Called(topic)
-
-       var r0 error
-       if rf, ok := ret.Get(0).(func(string) error); ok {
-               r0 = rf(topic)
-       } else {
-               r0 = ret.Error(0)
-       }
-
-       return r0
-}
diff --git a/dmaap-mediator-producer/mocks/KafkaFactory.go b/dmaap-mediator-producer/mocks/KafkaFactory.go
deleted file mode 100644 (file)
index f05457a..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-// Code generated by mockery v1.0.0. DO NOT EDIT.
-
-package mocks
-
-import (
-       mock "github.com/stretchr/testify/mock"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
-)
-
-// KafkaFactory is an autogenerated mock type for the KafkaFactory type
-type KafkaFactory struct {
-       mock.Mock
-}
-
-// NewKafkaConsumer provides a mock function with given fields: topicID
-func (_m KafkaFactory) NewKafkaConsumer(topicID string) (kafkaclient.KafkaConsumer, error) {
-       ret := _m.Called(topicID)
-
-       var r0 kafkaclient.KafkaConsumer
-       if rf, ok := ret.Get(0).(func(string) kafkaclient.KafkaConsumer); ok {
-               r0 = rf(topicID)
-       } else {
-               if ret.Get(0) != nil {
-                       r0 = ret.Get(0).(kafkaclient.KafkaConsumer)
-               }
-       }
-
-       var r1 error
-       if rf, ok := ret.Get(1).(func(string) error); ok {
-               r1 = rf(topicID)
-       } else {
-               r1 = ret.Error(1)
-       }
-
-       return r0, r1
-}
diff --git a/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go
deleted file mode 100644 (file)
index ab399df..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-// Code generated by mockery v2.9.3. DO NOT EDIT.
-
-package httpclient
-
-import (
-       http "net/http"
-
-       mock "github.com/stretchr/testify/mock"
-)
-
-// HTTPClient is an autogenerated mock type for the HTTPClient type
-type HTTPClient struct {
-       mock.Mock
-}
-
-// Do provides a mock function with given fields: _a0
-func (_m *HTTPClient) Do(_a0 *http.Request) (*http.Response, error) {
-       ret := _m.Called(_a0)
-
-       var r0 *http.Response
-       if rf, ok := ret.Get(0).(func(*http.Request) *http.Response); ok {
-               r0 = rf(_a0)
-       } else {
-               if ret.Get(0) != nil {
-                       r0 = ret.Get(0).(*http.Response)
-               }
-       }
-
-       var r1 error
-       if rf, ok := ret.Get(1).(func(*http.Request) error); ok {
-               r1 = rf(_a0)
-       } else {
-               r1 = ret.Error(1)
-       }
-
-       return r0, r1
-}
-
-// Get provides a mock function with given fields: url
-func (_m *HTTPClient) Get(url string) (*http.Response, error) {
-       ret := _m.Called(url)
-
-       var r0 *http.Response
-       if rf, ok := ret.Get(0).(func(string) *http.Response); ok {
-               r0 = rf(url)
-       } else {
-               if ret.Get(0) != nil {
-                       r0 = ret.Get(0).(*http.Response)
-               }
-       }
-
-       var r1 error
-       if rf, ok := ret.Get(1).(func(string) error); ok {
-               r1 = rf(url)
-       } else {
-               r1 = ret.Error(1)
-       }
-
-       return r0, r1
-}
diff --git a/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
deleted file mode 100644 (file)
index 271590f..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-// Code generated by mockery v2.9.3. DO NOT EDIT.
-
-package jobshandler
-
-import (
-       mock "github.com/stretchr/testify/mock"
-       jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-)
-
-// JobsHandler is an autogenerated mock type for the JobsHandler type
-type JobsHandler struct {
-       mock.Mock
-}
-
-// AddJob provides a mock function with given fields: _a0
-func (_m *JobsHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
-       ret := _m.Called(_a0)
-
-       var r0 error
-       if rf, ok := ret.Get(0).(func(jobs.JobInfo) error); ok {
-               r0 = rf(_a0)
-       } else {
-               r0 = ret.Error(0)
-       }
-
-       return r0
-}
-
-// DeleteJob provides a mock function with given fields: jobId
-func (_m *JobsHandler) DeleteJobFromRESTCall(jobId string) {
-       _m.Called(jobId)
-}
diff --git a/dmaap-mediator-producer/security/producer.crt b/dmaap-mediator-producer/security/producer.crt
deleted file mode 100644 (file)
index 0f6d8a3..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
------BEGIN CERTIFICATE-----
-MIIDXzCCAkegAwIBAgIUEbuDTP0ixwxCxCQ9tR5DijGCbtkwDQYJKoZIhvcNAQEL
-BQAwPzELMAkGA1UEBhMCc2UxDDAKBgNVBAoMA0VTVDERMA8GA1UECwwIRXJpY3Nz
-b24xDzANBgNVBAMMBnNlcnZlcjAeFw0yMTEwMTkxNDA1MzVaFw0zMTEwMTcxNDA1
-MzVaMD8xCzAJBgNVBAYTAnNlMQwwCgYDVQQKDANFU1QxETAPBgNVBAsMCEVyaWNz
-c29uMQ8wDQYDVQQDDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
-AoIBAQDnH4imV8kx/mXz6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEE
-Y6VWU47bSYRn2xJOP+wmfKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe
-72iMbTc5q2uYi0ImB5/m3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNrua
-I4iOnMvvuc71gvHol7appRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVo
-uUZYYJseFhOlIANaXn6qmz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPD
-x9PjmGmf6eOEC2ZHBi4OHwjIzmLnAgMBAAGjUzBRMB0GA1UdDgQWBBRjeDLPpLm2
-W623wna7xBCbHxtxVjAfBgNVHSMEGDAWgBRjeDLPpLm2W623wna7xBCbHxtxVjAP
-BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAbFUAWFZaIMXmd5qv/
-xJYr1oPJpsmbgWGRWZWDZqbUabvWObyXlDJWIau60BerfcC5TmyElBjTyONSGwCT
-tq+SVB0PXpgqa8ZQ25Ytn2AMDFWhrGbOefCXs6te3HGq6BNubTWrOVIvJypCwC95
-+iXVuDd4eg+n2fWv7h7fZRZHum/zLoRxB2lKoMMbc/BQX9hbtP6xyvIVvaYdhcJw
-VzJJGIDqpMiMH6IBaOFSmgfOyGblGKAicj3E3kpGBfadLx3R+9V6aG7zyBnVbr2w
-YJbV2Ay4PrF+PTpCMB/mNwC5RBTYHpSNdrCMSyq3I+QPVJq8dPJr7fd1Uwl3WHqX
-FV0h
------END CERTIFICATE-----
diff --git a/dmaap-mediator-producer/security/producer.key b/dmaap-mediator-producer/security/producer.key
deleted file mode 100644 (file)
index 5346bb7..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
------BEGIN PRIVATE KEY-----
-MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnH4imV8kx/mXz
-6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEEY6VWU47bSYRn2xJOP+wm
-fKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe72iMbTc5q2uYi0ImB5/m
-3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNruaI4iOnMvvuc71gvHol7ap
-pRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVouUZYYJseFhOlIANaXn6q
-mz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPDx9PjmGmf6eOEC2ZHBi4O
-HwjIzmLnAgMBAAECggEBAMq1lZyPkh8PCUyLVX3VhC4jRybyAWBI+piKx+4EI6l/
-laP5dZcegCoo+w/mdbTpRHqAWGjec4e9+Nkoq8rLG6B2SCfaRJUYiEQSEvSBHAid
-BZqKK4B82GXQavNU91Vy1OT3vD7mpPXF6jEK6gAA0C4Wt7Lzo7ZfqEavRBDMsNnV
-jOxLwWJCFSKhfeA6grJCnagmEDKSxxazlNBgCahjPf/+IRJZ7Vk4Zjq+I/5nWKf8
-lYaQExKDIANuM/jMRnYVq5k4g2MKHUADWGTSvG1DMJiMHzdxb2miZovpIkEE86bC
-wKBuele9IR6Rb/wygYj7WdaWysZ081OT7mNyju08e4ECgYEA8+q7vv4Nlz8bAcmY
-Ip5517s15M5D9iLsE2Q5m9Zs99rUyQv0E8ekpChhtTSdvj+eNl39O4hji46Gyceu
-MYPfNL7+YWaFDxuyaXEe/OFuKbFqgE1p08HXFcQJCvgqD1MWO5b9BRDc0qpNFIA8
-eN9xFBMQ2UFaALBMAup7Ef85q4kCgYEA8pKOAIsgmlnO8P9cPzkMC1oozslraAti
-1JnOJjwPLoHFubtH2u7WoIkSvNfeNwfrsVXwAP0m7C8p7qhYppS+0XGjKpYNSezK
-1GCqCVv8R1m+AsSseSUUaQCmEydd+gQbBq3r4u3wU3ylrgAoR3m+7SVyhvD+vbwI
-7+zfj+O3zu8CgYEAqaAsQH5c5Tm1hmCztB+RjD1dFWl8ScevdSzWA1HzJcrA/6+Y
-ZckI7kBG8sVMjemgFR735FbNI1hS1DBRK44Rw5SvQv0Qu5j/UeShMCt1ePkwn1k2
-p1S+Rxy1TTOXzGBzra0q+ELpzncwc3lalJSPBu7bYLrZ5HC167E1NSbQ7EECgYBo
-e/IIj+TyNz7pFcVhQixK84HiWGYYQddHJhzi4TnU2XcWonG3/uqZ6ZEVoJIJ+DJw
-h0jC1EggscwJDaBp2GY9Bwq2PD3rGsDfK+fx8ho/jYtH2/lCkVMyS2I9m9Zh68TM
-YrvZWo4LGASxZ0XyS6GOunOTZlkD1uuulMRTUU4KJwKBgQCwyjs0/ElVFvO0lPIC
-JJ//B5rqI7hNMJuTBvr4yiqVZdbgFukaU7FBVyNYDMpZi/nRbpglm+psFcwXtL8n
-bHOIGLkh8vB7OuETRYhXs567lPYtO4BmHZlXW70Sq/0xqi/Mmz1RuEg4SQ1Ug5oy
-wG6IV5EWSQAhsGirdybQ+bY7Kw==
------END PRIVATE KEY-----
diff --git a/dmaap-mediator-producer/stub/consumer/consumerstub.go b/dmaap-mediator-producer/stub/consumer/consumerstub.go
deleted file mode 100644 (file)
index 526e61e..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package main
-
-import (
-       "encoding/json"
-       "flag"
-       "fmt"
-       "io"
-       http "net/http"
-       "time"
-
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
-)
-
-var httpClient http.Client
-
-func main() {
-       httpClient = http.Client{
-               Timeout: time.Second * 5,
-       }
-       port := flag.Int("port", 40935, "The port this consumer will listen on")
-       flag.Parse()
-       http.HandleFunc("/jobs", handleData)
-
-       registerJob(*port)
-
-       fmt.Println("Starting consumer on port: ", *port)
-       fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
-}
-
-func registerJob(port int) {
-       jobInfo := struct {
-               JobOwner      string `json:"job_owner"`
-               JobResultUri  string `json:"job_result_uri"`
-               InfoTypeId    string `json:"info_type_id"`
-               JobDefinition string `json:"job_definition"`
-       }{
-               JobOwner:      fmt.Sprintf("test%v", port),
-               JobResultUri:  fmt.Sprintf("http://localhost:%v/jobs", port),
-               InfoTypeId:    "STD_Fault_Messages",
-               JobDefinition: "{}",
-       }
-       fmt.Println("Registering consumer: ", jobInfo)
-       body, _ := json.Marshal(jobInfo)
-       putErr := restclient.Put(fmt.Sprintf("https://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
-       if putErr != nil {
-               fmt.Println("Unable to register consumer: ", putErr)
-       }
-}
-
-func handleData(w http.ResponseWriter, req *http.Request) {
-       defer req.Body.Close()
-       if reqData, err := io.ReadAll(req.Body); err == nil {
-               fmt.Println("Consumer received body: ", string(reqData))
-       }
-}
diff --git a/dmaap-mediator-producer/stub/dmaap/mrstub.go b/dmaap-mediator-producer/stub/dmaap/mrstub.go
deleted file mode 100644 (file)
index 451bc9a..0000000
+++ /dev/null
@@ -1,103 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package main
-
-import (
-       "encoding/json"
-       "flag"
-       "fmt"
-       "math/rand"
-       "net/http"
-       "time"
-)
-
-var r = rand.New(rand.NewSource(time.Now().UnixNano()))
-
-type FaultMessage struct {
-       Event Event `json:"event"`
-}
-
-type Event struct {
-       CommonEventHeader CommonEventHeader `json:"commonEventHeader"`
-       FaultFields       FaultFields       `json:"faultFields"`
-}
-
-type CommonEventHeader struct {
-       Domain     string `json:"domain"`
-       SourceName string `json:"sourceName"`
-}
-
-type FaultFields struct {
-       AlarmCondition string `json:"alarmCondition"`
-       EventSeverity  string `json:"eventSeverity"`
-}
-
-func main() {
-       port := flag.Int("port", 3905, "The port this message router will listen on")
-       flag.Parse()
-
-       http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData)
-
-       fmt.Print("Starting mr on port: ", *port)
-       fmt.Println(http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../security/producer.crt", "../../security/producer.key", nil))
-
-}
-
-var critical = true
-
-func handleData(w http.ResponseWriter, req *http.Request) {
-       time.Sleep(time.Duration(r.Intn(3)) * time.Second)
-
-       w.Header().Set("Content-Type", "application/json")
-
-       var responseBody []byte
-       if critical {
-               responseBody = getFaultMessage("CRITICAL")
-               fmt.Println("Sending CRITICAL")
-               critical = false
-       } else {
-               responseBody = getFaultMessage("NORMAL")
-               fmt.Println("Sending NORMAL")
-               critical = true
-       }
-       fmt.Fprint(w, string(responseBody))
-}
-
-func getFaultMessage(eventSeverity string) []byte {
-       linkFailureMessage := FaultMessage{
-               Event: Event{
-                       CommonEventHeader: CommonEventHeader{
-                               Domain:     "fault",
-                               SourceName: "ERICSSON-O-RU-11220",
-                       },
-                       FaultFields: FaultFields{
-                               AlarmCondition: "28",
-                               EventSeverity:  eventSeverity,
-                       },
-               },
-       }
-       fmt.Printf("Sending message: %v\n", linkFailureMessage)
-
-       messageAsByteArray, _ := json.Marshal(linkFailureMessage)
-       response := [1]string{string(messageAsByteArray)}
-       responseAsByteArray, _ := json.Marshal(response)
-       return responseAsByteArray
-}
diff --git a/dmaap-mediator-producer/stub/ics/ics.go b/dmaap-mediator-producer/stub/ics/ics.go
deleted file mode 100644 (file)
index 87457c2..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-// -
-//   ========================LICENSE_START=================================
-//   O-RAN-SC
-//   %%
-//   Copyright (C) 2021: Nordix Foundation
-//   %%
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
-//
-//        http://www.apache.org/licenses/LICENSE-2.0
-//
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   ========================LICENSE_END===================================
-//
-
-package main
-
-import (
-       "flag"
-       "fmt"
-       "io/ioutil"
-       "net/http"
-
-       "github.com/gorilla/mux"
-)
-
-func main() {
-       port := flag.Int("port", 8434, "The port this stub will listen on")
-       flag.Parse()
-       fmt.Println("Starting ICS stub on port ", *port)
-
-       r := mux.NewRouter()
-       r.HandleFunc("/data-producer/v1/info-types/{typeId}", handleTypeRegistration).Methods(http.MethodPut, http.MethodPut)
-       r.HandleFunc("/data-producer/v1/info-producers/{producerId}", handleProducerRegistration).Methods(http.MethodPut, http.MethodPut)
-       fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), r))
-}
-
-func handleTypeRegistration(w http.ResponseWriter, r *http.Request) {
-       vars := mux.Vars(r)
-       id, ok := vars["typeId"]
-       if ok {
-               fmt.Printf("Registered type %v with schema: %v\n", id, readBody(r))
-       }
-}
-
-func handleProducerRegistration(w http.ResponseWriter, r *http.Request) {
-       vars := mux.Vars(r)
-       id, ok := vars["producerId"]
-       if ok {
-               fmt.Printf("Registered producer %v with data: %v\n", id, readBody(r))
-       }
-}
-
-func readBody(r *http.Request) string {
-       b, readErr := ioutil.ReadAll(r.Body)
-       if readErr != nil {
-               return fmt.Sprintf("Unable to read body due to: %v", readErr)
-       }
-       return string(b)
-}
index b87b789..7c6b6c2 100644 (file)
@@ -58,22 +58,6 @@ The API is also described in Swagger-JSON and YAML:
 
    "DMaaP Adaptor API", ":download:`link <../dmaap-adaptor-java/api/api.json>`", ":download:`link <../dmaap-adaptor-java/api/api.yaml>`"
 
-DMaaP Mediator Producer
-=======================
-
-The DMaaP Mediator Producer provides support for push delivery of any data received from DMaaP or Kafka.
-
-See `DMaaP Mediator Producer API <./dmaap-mediator-producer-api.html>`_ for full details of the API.
-
-The API is also described in Swagger-JSON and YAML:
-
-
-.. csv-table::
-   :header: "API name", "|swagger-icon|", "|yaml-icon|"
-   :widths: 10,5, 5
-
-   "DMaaP Mediator Producer API", ":download:`link <../dmaap-mediator-producer/api/swagger.json>`", ":download:`link <../dmaap-mediator-producer/api/swagger.yaml>`"
-
 K8S Helm Chart LCM Manager (Initial)
 ====================================
 
index 87a9b71..21bfee9 100644 (file)
@@ -26,12 +26,6 @@ redoc = [
                 'name': 'DMaaP Adaptor API',
                 'page': 'dmaap-adaptor-api',
                 'spec': '../dmaap-adaptor-java/api/api.json',
-            },
-            {
-                'name': 'DMaaP Mediator Producer API',
-                'page': 'dmaap-mediator-producer-api',
-                'spec': '../dmaap-mediator-producer/api/swagger.json',
-                'embed': True,
             }
         ]
 
index af206e7..b77785b 100644 (file)
@@ -68,11 +68,6 @@ The following properties in the application.yaml file have to be modified:
 * app.webclient.trust-store=./config/truststore.jks
 * app.configuration-filepath=./src/test/resources/test_application_configuration.json
 
-DMaaP Mediator Producer
------------------------
-
-To build and run this Go implementation, see the README.md file under the folder "dmaap-mediator-producer" in the "nonrtric" repo.
-
 O-DU & O-RU fronthaul recovery
 ------------------------------
 
index f598d5a..e8cd929 100644 (file)
@@ -185,7 +185,7 @@ Information jobs defined using ICS then allow information consumers to retrieve
 There are two alternative implementations to allow Information Consumers to consume DMaaP or Kafka events as coordinated Information Jobs.
 
 1. A version implemented in Java Spring (DMaaP Adaptor Service).
-2. A version implemented in Go (DMaaP Mediator Producer).
+2. A version implemented in Go (DMaaP Mediator Producer), see `DMaaP Mediator Producer documentation site <https://docs.o-ran-sc.org/projects/o-ran-sc-nonrtric-plt-dmaapmediatorproducer>`_.
 
 Initial Non-RT-RIC App Catalogue
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
index 3e8dc26..31c56c1 100644 (file)
@@ -136,23 +136,6 @@ Version history DMaaP Adator
 +------------+----------+------------------+-----------------+
 
 
-Version history DMaaP Mediator Producer
-=======================================
-
-+------------+----------+------------------+-----------------+
-| **Date**   | **Ver.** | **Author**       | **Comment**     |
-|            |          |                  |                 |
-+------------+----------+------------------+-----------------+
-| 2021-12-13 | 1.0.0    | Henrik Andersson | E Release       |
-|            |          |                  | Initial version |
-+------------+----------+------------------+-----------------+
-| 2022-02-07 | 1.0.1    | Henrik Andersson | E Maintenance   |
-|            |          |                  | Release         |
-|            |          |                  | Added Kafka as  |
-|            |          |                  | message source  |
-+------------+----------+------------------+-----------------+
-
-
 Version history usecase O-RU Front-Haul Recovery Script version
 ===============================================================