2 // ========================LICENSE_START=================================
5 // Copyright (C) 2022: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
33 "github.com/stretchr/testify/require"
34 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
36 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
39 // This is not a real test, just a way to get the Swagger documentation generated automatically.
40 // Hence there are no assertions in this test.
41 func TestGenerateSwaggerDocs(t *testing.T) {
42 cmd := exec.Command("./generate_swagger_docs.sh")
46 fmt.Println("Error generating Swagger:", err)
50 func TestValidateConfiguration(t *testing.T) {
51 assertions := require.New(t)
53 validConfig := config.Config{
54 InfoProducerHost: "host",
55 DMaaPMRAddress: "address",
56 KafkaBootstrapServers: "servers",
57 ProducerCertPath: "path",
58 ProducerKeyPath: "path",
60 assertions.Nil(validateConfiguration(&validConfig))
62 missingProducerHost := config.Config{
63 DMaaPMRAddress: "address",
64 KafkaBootstrapServers: "servers",
65 ProducerCertPath: "path",
66 ProducerKeyPath: "path",
68 assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
70 missingCert := config.Config{
71 InfoProducerHost: "host",
72 DMaaPMRAddress: "address",
73 KafkaBootstrapServers: "servers",
74 ProducerKeyPath: "path",
76 assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
78 missingCertKey := config.Config{
79 InfoProducerHost: "host",
80 DMaaPMRAddress: "address",
81 KafkaBootstrapServers: "servers",
82 ProducerCertPath: "path",
84 assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
86 missingMRAddress := config.Config{
87 InfoProducerHost: "host",
88 KafkaBootstrapServers: "servers",
89 ProducerCertPath: "path",
90 ProducerKeyPath: "path",
92 assertions.Nil(validateConfiguration(&missingMRAddress))
94 missingKafkaServers := config.Config{
95 InfoProducerHost: "host",
96 DMaaPMRAddress: "address",
97 ProducerCertPath: "path",
98 ProducerKeyPath: "path",
100 assertions.Nil(validateConfiguration(&missingKafkaServers))
102 missingMRAddressdAndKafkaServers := config.Config{
103 InfoProducerHost: "host",
104 ProducerCertPath: "path",
105 ProducerKeyPath: "path",
107 assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
108 assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
111 func TestRegisterTypesAndProducer(t *testing.T) {
112 assertions := require.New(t)
114 wg := sync.WaitGroup{}
115 clientMock := NewTestClient(func(req *http.Request) *http.Response {
116 if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
117 assertions.Equal(req.Method, "PUT")
118 body := getBodyAsString(req, t)
119 assertions.Contains(body, "info_job_data_schema")
120 assertions.Equal("application/json", req.Header.Get("Content-Type"))
122 return &http.Response{
124 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
125 Header: make(http.Header), // Must be set to non-nil value or it panics
127 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
128 assertions.Equal(req.Method, "PUT")
129 body := getBodyAsString(req, t)
130 assertions.Contains(body, "info_job_data_schema")
131 assertions.Equal("application/json", req.Header.Get("Content-Type"))
133 return &http.Response{
135 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
136 Header: make(http.Header), // Must be set to non-nil value or it panics
138 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
139 assertions.Equal(req.Method, "PUT")
140 body := getBodyAsString(req, t)
141 assertions.Contains(body, "callbackAddress/health_check")
142 assertions.Contains(body, "callbackAddress/info_job")
143 assertions.Contains(body, "Kafka_TestTopic")
144 assertions.Contains(body, "STD_Fault_Messages")
145 assertions.Equal("application/json", req.Header.Get("Content-Type"))
147 return &http.Response{
149 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
150 Header: make(http.Header), // Must be set to non-nil value or it panics
153 t.Error("Wrong call to client: ", req)
157 jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
160 err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
164 if waitTimeout(&wg, 2*time.Second) {
165 t.Error("Not all calls to server were made")
170 type RoundTripFunc func(req *http.Request) *http.Response
172 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
176 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
177 func NewTestClient(fn RoundTripFunc) *http.Client {
179 Transport: RoundTripFunc(fn),
183 func getBodyAsString(req *http.Request, t *testing.T) string {
184 buf := new(bytes.Buffer)
185 if _, err := buf.ReadFrom(req.Body); err != nil {
191 // waitTimeout waits for the waitgroup for the specified max timeout.
192 // Returns true if waiting timed out.
193 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
194 c := make(chan struct{})
201 return false // completed normally
202 case <-time.After(timeout):
203 return true // timed out