Update documentation for DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / main_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package main
22
23 import (
24         "bytes"
25         "io/ioutil"
26         "net/http"
27         "os/exec"
28         "sync"
29         "testing"
30         "time"
31
32         "github.com/stretchr/testify/require"
33         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
35         "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
36 )
37
38 // This is not a real test, just a way to get the Swagger documentation generated automatically.
39 // Hence there are no assertions in this test.
40 func TestGenerateSwaggerDocs(t *testing.T) {
41         cmd := exec.Command("./generate_swagger_docs.sh")
42
43         cmd.Run()
44 }
45
46 func TestValidateConfiguration(t *testing.T) {
47         assertions := require.New(t)
48
49         validConfig := config.Config{
50                 InfoProducerHost:      "host",
51                 DMaaPMRAddress:        "address",
52                 KafkaBootstrapServers: "servers",
53                 ProducerCertPath:      "path",
54                 ProducerKeyPath:       "path",
55         }
56         assertions.Nil(validateConfiguration(&validConfig))
57
58         missingProducerHost := config.Config{
59                 DMaaPMRAddress:        "address",
60                 KafkaBootstrapServers: "servers",
61                 ProducerCertPath:      "path",
62                 ProducerKeyPath:       "path",
63         }
64         assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
65
66         missingCert := config.Config{
67                 InfoProducerHost:      "host",
68                 DMaaPMRAddress:        "address",
69                 KafkaBootstrapServers: "servers",
70                 ProducerKeyPath:       "path",
71         }
72         assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
73
74         missingCertKey := config.Config{
75                 InfoProducerHost:      "host",
76                 DMaaPMRAddress:        "address",
77                 KafkaBootstrapServers: "servers",
78                 ProducerCertPath:      "path",
79         }
80         assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
81
82         missingMRAddress := config.Config{
83                 InfoProducerHost:      "host",
84                 KafkaBootstrapServers: "servers",
85                 ProducerCertPath:      "path",
86                 ProducerKeyPath:       "path",
87         }
88         assertions.Nil(validateConfiguration(&missingMRAddress))
89
90         missingKafkaServers := config.Config{
91                 InfoProducerHost: "host",
92                 DMaaPMRAddress:   "address",
93                 ProducerCertPath: "path",
94                 ProducerKeyPath:  "path",
95         }
96         assertions.Nil(validateConfiguration(&missingKafkaServers))
97
98         missingMRAddressdAndKafkaServers := config.Config{
99                 InfoProducerHost: "host",
100                 ProducerCertPath: "path",
101                 ProducerKeyPath:  "path",
102         }
103         assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
104         assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
105 }
106
107 func TestRegisterTypesAndProducer(t *testing.T) {
108         assertions := require.New(t)
109
110         wg := sync.WaitGroup{}
111         clientMock := NewTestClient(func(req *http.Request) *http.Response {
112                 if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
113                         assertions.Equal(req.Method, "PUT")
114                         body := getBodyAsString(req, t)
115                         assertions.Contains(body, "info_job_data_schema")
116                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
117                         wg.Done()
118                         return &http.Response{
119                                 StatusCode: 200,
120                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
121                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
122                         }
123                 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
124                         assertions.Equal(req.Method, "PUT")
125                         body := getBodyAsString(req, t)
126                         assertions.Contains(body, "info_job_data_schema")
127                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
128                         wg.Done()
129                         return &http.Response{
130                                 StatusCode: 200,
131                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
132                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
133                         }
134                 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
135                         assertions.Equal(req.Method, "PUT")
136                         body := getBodyAsString(req, t)
137                         assertions.Contains(body, "callbackAddress/health_check")
138                         assertions.Contains(body, "callbackAddress/info_job")
139                         assertions.Contains(body, "Kafka_TestTopic")
140                         assertions.Contains(body, "STD_Fault_Messages")
141                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
142                         wg.Done()
143                         return &http.Response{
144                                 StatusCode: 200,
145                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
146                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
147                         }
148                 }
149                 t.Error("Wrong call to client: ", req)
150                 t.Fail()
151                 return nil
152         })
153         jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
154
155         wg.Add(3)
156         err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
157
158         assertions.Nil(err)
159
160         if waitTimeout(&wg, 2*time.Second) {
161                 t.Error("Not all calls to server were made")
162                 t.Fail()
163         }
164 }
165
166 type RoundTripFunc func(req *http.Request) *http.Response
167
168 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
169         return f(req), nil
170 }
171
172 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
173 func NewTestClient(fn RoundTripFunc) *http.Client {
174         return &http.Client{
175                 Transport: RoundTripFunc(fn),
176         }
177 }
178
179 func getBodyAsString(req *http.Request, t *testing.T) string {
180         buf := new(bytes.Buffer)
181         if _, err := buf.ReadFrom(req.Body); err != nil {
182                 t.Fail()
183         }
184         return buf.String()
185 }
186
187 // waitTimeout waits for the waitgroup for the specified max timeout.
188 // Returns true if waiting timed out.
189 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
190         c := make(chan struct{})
191         go func() {
192                 defer close(c)
193                 wg.Wait()
194         }()
195         select {
196         case <-c:
197                 return false // completed normally
198         case <-time.After(timeout):
199                 return true // timed out
200         }
201 }