2 // ========================LICENSE_START=================================
5 // Copyright (C) 2022: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
32 "github.com/stretchr/testify/require"
33 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
34 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
35 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
38 // This is not a real test, just a way to get the Swagger documentation generated automatically.
39 // Hence there are no assertions in this test.
40 func TestGenerateSwaggerDocs(t *testing.T) {
41 cmd := exec.Command("./generate_swagger_docs.sh")
46 func TestValidateConfiguration(t *testing.T) {
47 assertions := require.New(t)
49 validConfig := config.Config{
50 InfoProducerHost: "host",
51 DMaaPMRAddress: "address",
52 KafkaBootstrapServers: "servers",
53 ProducerCertPath: "path",
54 ProducerKeyPath: "path",
56 assertions.Nil(validateConfiguration(&validConfig))
58 missingProducerHost := config.Config{
59 DMaaPMRAddress: "address",
60 KafkaBootstrapServers: "servers",
61 ProducerCertPath: "path",
62 ProducerKeyPath: "path",
64 assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
66 missingCert := config.Config{
67 InfoProducerHost: "host",
68 DMaaPMRAddress: "address",
69 KafkaBootstrapServers: "servers",
70 ProducerKeyPath: "path",
72 assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
74 missingCertKey := config.Config{
75 InfoProducerHost: "host",
76 DMaaPMRAddress: "address",
77 KafkaBootstrapServers: "servers",
78 ProducerCertPath: "path",
80 assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
82 missingMRAddress := config.Config{
83 InfoProducerHost: "host",
84 KafkaBootstrapServers: "servers",
85 ProducerCertPath: "path",
86 ProducerKeyPath: "path",
88 assertions.Nil(validateConfiguration(&missingMRAddress))
90 missingKafkaServers := config.Config{
91 InfoProducerHost: "host",
92 DMaaPMRAddress: "address",
93 ProducerCertPath: "path",
94 ProducerKeyPath: "path",
96 assertions.Nil(validateConfiguration(&missingKafkaServers))
98 missingMRAddressdAndKafkaServers := config.Config{
99 InfoProducerHost: "host",
100 ProducerCertPath: "path",
101 ProducerKeyPath: "path",
103 assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
104 assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
107 func TestRegisterTypesAndProducer(t *testing.T) {
108 assertions := require.New(t)
110 wg := sync.WaitGroup{}
111 clientMock := NewTestClient(func(req *http.Request) *http.Response {
112 if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
113 assertions.Equal(req.Method, "PUT")
114 body := getBodyAsString(req, t)
115 assertions.Contains(body, "info_job_data_schema")
116 assertions.Equal("application/json", req.Header.Get("Content-Type"))
118 return &http.Response{
120 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
121 Header: make(http.Header), // Must be set to non-nil value or it panics
123 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
124 assertions.Equal(req.Method, "PUT")
125 body := getBodyAsString(req, t)
126 assertions.Contains(body, "info_job_data_schema")
127 assertions.Equal("application/json", req.Header.Get("Content-Type"))
129 return &http.Response{
131 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
132 Header: make(http.Header), // Must be set to non-nil value or it panics
134 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
135 assertions.Equal(req.Method, "PUT")
136 body := getBodyAsString(req, t)
137 assertions.Contains(body, "callbackAddress/health_check")
138 assertions.Contains(body, "callbackAddress/info_job")
139 assertions.Contains(body, "Kafka_TestTopic")
140 assertions.Contains(body, "STD_Fault_Messages")
141 assertions.Equal("application/json", req.Header.Get("Content-Type"))
143 return &http.Response{
145 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
146 Header: make(http.Header), // Must be set to non-nil value or it panics
149 t.Error("Wrong call to client: ", req)
153 jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
156 err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
160 if waitTimeout(&wg, 2*time.Second) {
161 t.Error("Not all calls to server were made")
166 type RoundTripFunc func(req *http.Request) *http.Response
168 func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
172 //NewTestClient returns *http.Client with Transport replaced to avoid making real calls
173 func NewTestClient(fn RoundTripFunc) *http.Client {
175 Transport: RoundTripFunc(fn),
179 func getBodyAsString(req *http.Request, t *testing.T) string {
180 buf := new(bytes.Buffer)
181 if _, err := buf.ReadFrom(req.Body); err != nil {
187 // waitTimeout waits for the waitgroup for the specified max timeout.
188 // Returns true if waiting timed out.
189 func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
190 c := make(chan struct{})
197 return false // completed normally
198 case <-time.After(timeout):
199 return true // timed out