Uplift from master branch
[nonrtric.git] / dmaap-mediator-producer / main_test.go
diff --git a/dmaap-mediator-producer/main_test.go b/dmaap-mediator-producer/main_test.go
new file mode 100644 (file)
index 0000000..19851be
--- /dev/null
@@ -0,0 +1,205 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2022: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "bytes"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "os/exec"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+)
+
+// This is not a real test, just a way to get the Swagger documentation generated automatically.
+// Hence there are no assertions in this test.
+func TestGenerateSwaggerDocs(t *testing.T) {
+       cmd := exec.Command("./generate_swagger_docs.sh")
+
+       err := cmd.Run()
+       if err != nil {
+               fmt.Println("Error generating Swagger:", err)
+       }
+}
+
+func TestValidateConfiguration(t *testing.T) {
+       assertions := require.New(t)
+
+       validConfig := config.Config{
+               InfoProducerHost:      "host",
+               DMaaPMRAddress:        "address",
+               KafkaBootstrapServers: "servers",
+               ProducerCertPath:      "path",
+               ProducerKeyPath:       "path",
+       }
+       assertions.Nil(validateConfiguration(&validConfig))
+
+       missingProducerHost := config.Config{
+               DMaaPMRAddress:        "address",
+               KafkaBootstrapServers: "servers",
+               ProducerCertPath:      "path",
+               ProducerKeyPath:       "path",
+       }
+       assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
+
+       missingCert := config.Config{
+               InfoProducerHost:      "host",
+               DMaaPMRAddress:        "address",
+               KafkaBootstrapServers: "servers",
+               ProducerKeyPath:       "path",
+       }
+       assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
+
+       missingCertKey := config.Config{
+               InfoProducerHost:      "host",
+               DMaaPMRAddress:        "address",
+               KafkaBootstrapServers: "servers",
+               ProducerCertPath:      "path",
+       }
+       assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
+
+       missingMRAddress := config.Config{
+               InfoProducerHost:      "host",
+               KafkaBootstrapServers: "servers",
+               ProducerCertPath:      "path",
+               ProducerKeyPath:       "path",
+       }
+       assertions.Nil(validateConfiguration(&missingMRAddress))
+
+       missingKafkaServers := config.Config{
+               InfoProducerHost: "host",
+               DMaaPMRAddress:   "address",
+               ProducerCertPath: "path",
+               ProducerKeyPath:  "path",
+       }
+       assertions.Nil(validateConfiguration(&missingKafkaServers))
+
+       missingMRAddressdAndKafkaServers := config.Config{
+               InfoProducerHost: "host",
+               ProducerCertPath: "path",
+               ProducerKeyPath:  "path",
+       }
+       assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
+       assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
+}
+
+func TestRegisterTypesAndProducer(t *testing.T) {
+       assertions := require.New(t)
+
+       wg := sync.WaitGroup{}
+       clientMock := NewTestClient(func(req *http.Request) *http.Response {
+               if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
+                       assertions.Equal(req.Method, "PUT")
+                       body := getBodyAsString(req, t)
+                       assertions.Contains(body, "info_job_data_schema")
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       wg.Done()
+                       return &http.Response{
+                               StatusCode: 200,
+                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+                               Header:     make(http.Header), // Must be set to non-nil value or it panics
+                       }
+               } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
+                       assertions.Equal(req.Method, "PUT")
+                       body := getBodyAsString(req, t)
+                       assertions.Contains(body, "info_job_data_schema")
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       wg.Done()
+                       return &http.Response{
+                               StatusCode: 200,
+                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+                               Header:     make(http.Header), // Must be set to non-nil value or it panics
+                       }
+               } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
+                       assertions.Equal(req.Method, "PUT")
+                       body := getBodyAsString(req, t)
+                       assertions.Contains(body, "callbackAddress/health_check")
+                       assertions.Contains(body, "callbackAddress/info_job")
+                       assertions.Contains(body, "Kafka_TestTopic")
+                       assertions.Contains(body, "STD_Fault_Messages")
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       wg.Done()
+                       return &http.Response{
+                               StatusCode: 200,
+                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+                               Header:     make(http.Header), // Must be set to non-nil value or it panics
+                       }
+               }
+               t.Error("Wrong call to client: ", req)
+               t.Fail()
+               return nil
+       })
+       jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
+
+       wg.Add(3)
+       err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
+
+       assertions.Nil(err)
+
+       if waitTimeout(&wg, 2*time.Second) {
+               t.Error("Not all calls to server were made")
+               t.Fail()
+       }
+}
+
+type RoundTripFunc func(req *http.Request) *http.Response
+
+func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
+       return f(req), nil
+}
+
+//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
+func NewTestClient(fn RoundTripFunc) *http.Client {
+       return &http.Client{
+               Transport: RoundTripFunc(fn),
+       }
+}
+
+func getBodyAsString(req *http.Request, t *testing.T) string {
+       buf := new(bytes.Buffer)
+       if _, err := buf.ReadFrom(req.Body); err != nil {
+               t.Fail()
+       }
+       return buf.String()
+}
+
+// waitTimeout waits for the waitgroup for the specified max timeout.
+// Returns true if waiting timed out.
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               return false // completed normally
+       case <-time.After(timeout):
+               return true // timed out
+       }
+}