Merge "Updates for F release"
[nonrtric.git] / dmaap-mediator-producer / main_test.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package main
22
23 import (
24         "bytes"
25         "fmt"
26         "io/ioutil"
27         "net/http"
28         "os/exec"
29         "sync"
30         "testing"
31         "time"
32
33         "github.com/stretchr/testify/require"
34         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35         "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
36         "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
37 )
38
39 // This is not a real test, just a way to get the Swagger documentation generated automatically.
40 // Hence there are no assertions in this test.
41 func TestGenerateSwaggerDocs(t *testing.T) {
42         cmd := exec.Command("./generate_swagger_docs.sh")
43
44         err := cmd.Run()
45         if err != nil {
46                 fmt.Println("Error generating Swagger:", err)
47         }
48 }
49
50 func TestValidateConfiguration(t *testing.T) {
51         assertions := require.New(t)
52
53         validConfig := config.Config{
54                 InfoProducerHost:      "host",
55                 DMaaPMRAddress:        "address",
56                 KafkaBootstrapServers: "servers",
57                 ProducerCertPath:      "path",
58                 ProducerKeyPath:       "path",
59         }
60         assertions.Nil(validateConfiguration(&validConfig))
61
62         missingProducerHost := config.Config{
63                 DMaaPMRAddress:        "address",
64                 KafkaBootstrapServers: "servers",
65                 ProducerCertPath:      "path",
66                 ProducerKeyPath:       "path",
67         }
68         assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
69
70         missingCert := config.Config{
71                 InfoProducerHost:      "host",
72                 DMaaPMRAddress:        "address",
73                 KafkaBootstrapServers: "servers",
74                 ProducerKeyPath:       "path",
75         }
76         assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
77
78         missingCertKey := config.Config{
79                 InfoProducerHost:      "host",
80                 DMaaPMRAddress:        "address",
81                 KafkaBootstrapServers: "servers",
82                 ProducerCertPath:      "path",
83         }
84         assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
85
86         missingMRAddress := config.Config{
87                 InfoProducerHost:      "host",
88                 KafkaBootstrapServers: "servers",
89                 ProducerCertPath:      "path",
90                 ProducerKeyPath:       "path",
91         }
92         assertions.Nil(validateConfiguration(&missingMRAddress))
93
94         missingKafkaServers := config.Config{
95                 InfoProducerHost: "host",
96                 DMaaPMRAddress:   "address",
97                 ProducerCertPath: "path",
98                 ProducerKeyPath:  "path",
99         }
100         assertions.Nil(validateConfiguration(&missingKafkaServers))
101
102         missingMRAddressdAndKafkaServers := config.Config{
103                 InfoProducerHost: "host",
104                 ProducerCertPath: "path",
105                 ProducerKeyPath:  "path",
106         }
107         assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
108         assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
109 }
110
111 func TestRegisterTypesAndProducer(t *testing.T) {
112         assertions := require.New(t)
113
114         wg := sync.WaitGroup{}
115         clientMock := NewTestClient(func(req *http.Request) *http.Response {
116                 if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
117                         assertions.Equal(req.Method, "PUT")
118                         body := getBodyAsString(req, t)
119                         assertions.Contains(body, "info_job_data_schema")
120                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
121                         wg.Done()
122                         return &http.Response{
123                                 StatusCode: 200,
124                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
125                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
126                         }
127                 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
128                         assertions.Equal(req.Method, "PUT")
129                         body := getBodyAsString(req, t)
130                         assertions.Contains(body, "info_job_data_schema")
131                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
132                         wg.Done()
133                         return &http.Response{
134                                 StatusCode: 200,
135                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
136                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
137                         }
138                 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
139                         assertions.Equal(req.Method, "PUT")
140                         body := getBodyAsString(req, t)
141                         assertions.Contains(body, "callbackAddress/health_check")
142                         assertions.Contains(body, "callbackAddress/info_job")
143                         assertions.Contains(body, "Kafka_TestTopic")
144                         assertions.Contains(body, "STD_Fault_Messages")
145                         assertions.Equal("application/json", req.Header.Get("Content-Type"))
146                         wg.Done()
147                         return &http.Response{
148                                 StatusCode: 200,
149                                 Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
150                                 Header:     make(http.Header), // Must be set to non-nil value or it panics
151                         }
152                 }
153                 t.Error("Wrong call to client: ", req)
154                 t.Fail()
155                 return nil
156         })
157         jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
158
159         wg.Add(3)
160         err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
161
162         assertions.Nil(err)
163
164         if waitTimeout(&wg, 2*time.Second) {
165                 t.Error("Not all calls to server were made")
166                 t.Fail()
167         }
168 }
169
170 type RoundTripFunc func(req *http.Request) *http.Response
171
172 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
173         return f(req), nil
174 }
175
176 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
177 func NewTestClient(fn RoundTripFunc) *http.Client {
178         return &http.Client{
179                 Transport: RoundTripFunc(fn),
180         }
181 }
182
183 func getBodyAsString(req *http.Request, t *testing.T) string {
184         buf := new(bytes.Buffer)
185         if _, err := buf.ReadFrom(req.Body); err != nil {
186                 t.Fail()
187         }
188         return buf.String()
189 }
190
191 // waitTimeout waits for the waitgroup for the specified max timeout.
192 // Returns true if waiting timed out.
193 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
194         c := make(chan struct{})
195         go func() {
196                 defer close(c)
197                 wg.Wait()
198         }()
199         select {
200         case <-c:
201                 return false // completed normally
202         case <-time.After(timeout):
203                 return true // timed out
204         }
205 }