--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2022: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os/exec"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+)
+
+// This is not a real test, just a way to get the Swagger documentation generated automatically.
+// Hence there are no assertions in this test.
+func TestGenerateSwaggerDocs(t *testing.T) {
+ cmd := exec.Command("./generate_swagger_docs.sh")
+
+ err := cmd.Run()
+ if err != nil {
+ fmt.Println("Error generating Swagger:", err)
+ }
+}
+
+func TestValidateConfiguration(t *testing.T) {
+ assertions := require.New(t)
+
+ validConfig := config.Config{
+ InfoProducerHost: "host",
+ DMaaPMRAddress: "address",
+ KafkaBootstrapServers: "servers",
+ ProducerCertPath: "path",
+ ProducerKeyPath: "path",
+ }
+ assertions.Nil(validateConfiguration(&validConfig))
+
+ missingProducerHost := config.Config{
+ DMaaPMRAddress: "address",
+ KafkaBootstrapServers: "servers",
+ ProducerCertPath: "path",
+ ProducerKeyPath: "path",
+ }
+ assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
+
+ missingCert := config.Config{
+ InfoProducerHost: "host",
+ DMaaPMRAddress: "address",
+ KafkaBootstrapServers: "servers",
+ ProducerKeyPath: "path",
+ }
+ assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
+
+ missingCertKey := config.Config{
+ InfoProducerHost: "host",
+ DMaaPMRAddress: "address",
+ KafkaBootstrapServers: "servers",
+ ProducerCertPath: "path",
+ }
+ assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
+
+ missingMRAddress := config.Config{
+ InfoProducerHost: "host",
+ KafkaBootstrapServers: "servers",
+ ProducerCertPath: "path",
+ ProducerKeyPath: "path",
+ }
+ assertions.Nil(validateConfiguration(&missingMRAddress))
+
+ missingKafkaServers := config.Config{
+ InfoProducerHost: "host",
+ DMaaPMRAddress: "address",
+ ProducerCertPath: "path",
+ ProducerKeyPath: "path",
+ }
+ assertions.Nil(validateConfiguration(&missingKafkaServers))
+
+ missingMRAddressdAndKafkaServers := config.Config{
+ InfoProducerHost: "host",
+ ProducerCertPath: "path",
+ ProducerKeyPath: "path",
+ }
+ assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
+ assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
+}
+
+func TestRegisterTypesAndProducer(t *testing.T) {
+ assertions := require.New(t)
+
+ wg := sync.WaitGroup{}
+ clientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
+ assertions.Equal(req.Method, "PUT")
+ body := getBodyAsString(req, t)
+ assertions.Contains(body, "info_job_data_schema")
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
+ assertions.Equal(req.Method, "PUT")
+ body := getBodyAsString(req, t)
+ assertions.Contains(body, "info_job_data_schema")
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
+ assertions.Equal(req.Method, "PUT")
+ body := getBodyAsString(req, t)
+ assertions.Contains(body, "callbackAddress/health_check")
+ assertions.Contains(body, "callbackAddress/info_job")
+ assertions.Contains(body, "Kafka_TestTopic")
+ assertions.Contains(body, "STD_Fault_Messages")
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+ jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
+
+ wg.Add(3)
+ err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
+
+ assertions.Nil(err)
+
+ if waitTimeout(&wg, 2*time.Second) {
+ t.Error("Not all calls to server were made")
+ t.Fail()
+ }
+}
+
+type RoundTripFunc func(req *http.Request) *http.Response
+
+func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
+ return f(req), nil
+}
+
+//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
+func NewTestClient(fn RoundTripFunc) *http.Client {
+ return &http.Client{
+ Transport: RoundTripFunc(fn),
+ }
+}
+
+func getBodyAsString(req *http.Request, t *testing.T) string {
+ buf := new(bytes.Buffer)
+ if _, err := buf.ReadFrom(req.Body); err != nil {
+ t.Fail()
+ }
+ return buf.String()
+}
+
+// waitTimeout waits for the waitgroup for the specified max timeout.
+// Returns true if waiting timed out.
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return false // completed normally
+ case <-time.After(timeout):
+ return true // timed out
+ }
+}