Code Review
/
nonrtric.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Improve Swagger DMaaP Mediator Producer
[nonrtric.git]
/
dmaap-mediator-producer
/
internal
/
jobs
/
jobs_test.go
diff --git
a/dmaap-mediator-producer/internal/jobs/jobs_test.go
b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index
ab1165c
..
7d02104
100644
(file)
--- a/
dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/
dmaap-mediator-producer/internal/jobs/jobs_test.go
@@
-38,7
+38,7
@@
import (
"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
-func TestJobsManager
GetTypes
_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManager
LoadTypesFromConfiguration
_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
@@
-59,6
+59,8
@@
func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t
supportedTypes := managerUnderTest.GetSupportedTypes()
assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
supportedTypes := managerUnderTest.GetSupportedTypes()
assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
+ assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
+ assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
}
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
}
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
@@
-113,7
+115,7
@@
func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error())
+ assertions.Equal("missing required job identity: { {{0 0}} type1
}", err.Error())
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
@@
-129,7
+131,7
@@
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error())
+ assertions.Equal("missing required target URI: { job1 {{0 0}} type1
}", err.Error())
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
@@
-226,7
+228,7
@@
func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *tes
if req.URL.String() == "http://consumerHost/kafkatarget" {
assertions.Equal(req.Method, "POST")
assertions.Equal(kafkaMessages, getBodyAsString(req, t))
if req.URL.String() == "http://consumerHost/kafkatarget" {
assertions.Equal(req.Method, "POST")
assertions.Equal(kafkaMessages, getBodyAsString(req, t))
- assertions.Equal("
application/jso
n", req.Header.Get("Content-Type"))
+ assertions.Equal("
text/plai
n", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
StatusCode: 200,
wg.Done()
return &http.Response{
StatusCode: 200,
@@
-349,7
+351,7
@@
func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
assertions.Equal(req.Method, "POST")
assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
messageNo++
assertions.Equal(req.Method, "POST")
assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
messageNo++
- assertions.Equal("
application/jso
n", req.Header.Get("Content-Type"))
+ assertions.Equal("
text/plai
n", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
StatusCode: 200,
wg.Done()
return &http.Response{
StatusCode: 200,
@@
-362,7
+364,10
@@
func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
return nil
})
return nil
})
- jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+ jobUnderTest := newJob(JobInfo{
+ sourceType: kafkaSource,
+ TargetUri: "http://consumerHost/target",
+ }, distributeClientMock)
wg.Add(2)
go jobUnderTest.start()
wg.Add(2)
go jobUnderTest.start()
@@
-383,7
+388,7
@@
func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
- assertions.Equal(
"12"
, getBodyAsString(req, t))
+ assertions.Equal(
`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`
, getBodyAsString(req, t))
assertions.Equal("application/json", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
assertions.Equal("application/json", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
@@
-411,8
+416,9
@@
func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
go jobUnderTest.start()
go func() {
go jobUnderTest.start()
go func() {
- jobUnderTest.messagesChannel <- []byte("1")
- jobUnderTest.messagesChannel <- []byte("2")
+ jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
+ jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
+ jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
}()
if waitTimeout(&wg, 2*time.Second) {
}()
if waitTimeout(&wg, 2*time.Second) {
@@
-437,7
+443,7
@@
func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t
MaxTimeMiliseconds: 200,
})
MaxTimeMiliseconds: 200,
})
- assertions.Equal([]byte("
01
"), msgs)
+ assertions.Equal([]byte("
[\"0\",\"1\"]
"), msgs)
}
func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
assertions := require.New(t)
}
func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
assertions := require.New(t)
@@
-456,7
+462,7
@@
func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t
MaxTimeMiliseconds: 30,
})
MaxTimeMiliseconds: 30,
})
- assertions.Equal([]byte("
01
"), msgs)
+ assertions.Equal([]byte("
[\"0\",\"1\"]
"), msgs)
}
func fillMessagesBuffer(mc chan []byte) {
}
func fillMessagesBuffer(mc chan []byte) {