"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
-func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerLoadTypesFromConfiguration_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
supportedTypes := managerUnderTest.GetSupportedTypes()
assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
+ assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
+ assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
}
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error())
+ assertions.Equal("missing required job identity: { {{0 0}} type1 }", err.Error())
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error())
+ assertions.Equal("missing required target URI: { job1 {{0 0}} type1 }", err.Error())
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
if req.URL.String() == "http://consumerHost/kafkatarget" {
assertions.Equal(req.Method, "POST")
assertions.Equal(kafkaMessages, getBodyAsString(req, t))
- assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ assertions.Equal("text/plain", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
StatusCode: 200,
assertions.Equal(req.Method, "POST")
assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
messageNo++
- assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ assertions.Equal("text/plain", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
StatusCode: 200,
return nil
})
- jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+ jobUnderTest := newJob(JobInfo{
+ sourceType: kafkaSource,
+ TargetUri: "http://consumerHost/target",
+ }, distributeClientMock)
wg.Add(2)
go jobUnderTest.start()
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
- assertions.Equal("12", getBodyAsString(req, t))
+ assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t))
assertions.Equal("application/json", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
go jobUnderTest.start()
go func() {
- jobUnderTest.messagesChannel <- []byte("1")
- jobUnderTest.messagesChannel <- []byte("2")
+ jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
+ jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
+ jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
}()
if waitTimeout(&wg, 2*time.Second) {
MaxTimeMiliseconds: 200,
})
- assertions.Equal([]byte("01"), msgs)
+ assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
}
func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
assertions := require.New(t)
MaxTimeMiliseconds: 30,
})
- assertions.Equal([]byte("01"), msgs)
+ assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
}
func fillMessagesBuffer(mc chan []byte) {