import (
"fmt"
+ "strings"
"sync"
"time"
jobsHandler *jobsHandler
}
+type sourceType string
+
+const dMaaPSource = sourceType("dmaap")
+const kafkaSource = sourceType("kafka")
+
type JobInfo struct {
Owner string `json:"owner"`
LastUpdated string `json:"last_updated"`
TargetUri string `json:"target_uri"`
InfoJobData Parameters `json:"info_job_data"`
InfoTypeIdentity string `json:"info_type_identity"`
+ sourceType sourceType
}
type JobTypesManager interface {
func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
if err := jm.validateJobInfo(ji); err == nil {
typeData := jm.allTypes[ji.InfoTypeIdentity]
+ ji.sourceType = typeData.jobsHandler.sourceType
typeData.jobsHandler.addJobCh <- ji
log.Debug("Added job: ", ji)
return nil
type jobsHandler struct {
mu sync.Mutex
typeId string
+ sourceType sourceType
pollingAgent pollingAgent
jobs map[string]job
addJobCh chan JobInfo
func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
+ sourceType := kafkaSource
+ if typeDef.DMaaPTopicURL != "" {
+ sourceType = dMaaPSource
+ }
return &jobsHandler{
typeId: typeDef.Identity,
+ sourceType: sourceType,
pollingAgent: pollingAgent,
jobs: make(map[string]job),
addJobCh: make(chan JobInfo),
}
func (j *job) start() {
- if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
- j.startReadingSingleMessages()
- } else {
+ if j.isJobBuffered() {
j.startReadingMessagesBuffered()
+ } else {
+ j.startReadingSingleMessages()
}
}
func (j *job) read(bufferParams BufferTimeout) []byte {
wg := sync.WaitGroup{}
wg.Add(bufferParams.MaxSize)
- var msgs []byte
+ rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
c := make(chan struct{})
go func() {
i := 0
case <-c:
break out
case msg := <-j.messagesChannel:
+ rawMsgs = append(rawMsgs, msg)
i++
- msgs = append(msgs, msg...)
wg.Done()
if i == bufferParams.MaxSize {
break out
}()
j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
close(c)
- return msgs
+ return getAsJSONArray(rawMsgs)
+}
+
+func getAsJSONArray(rawMsgs [][]byte) []byte {
+ json := `"[`
+ for i := 0; i < len(rawMsgs); i++ {
+ msg := string(rawMsgs[i])
+ json = json + strings.ReplaceAll(msg, "\"", "\\\"")
+ if i < len(rawMsgs)-1 {
+ json = json + ","
+ }
+ }
+ return []byte(json + `]"`)
}
func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
func (j *job) sendMessagesToConsumer(messages []byte) {
log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
- if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+ contentType := restclient.ContentTypeJSON
+ if j.isJobKafka() && !j.isJobBuffered() {
+ contentType = restclient.ContentTypePlain
+ }
+ if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
return
}
log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
}
+
+func (j *job) isJobBuffered() bool {
+ return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
+}
+
+func (j *job) isJobKafka() bool {
+ return j.jobInfo.sourceType == kafkaSource
+}
supportedTypes := managerUnderTest.GetSupportedTypes()
assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
+ assertions.Equal(dMaaPSource, managerUnderTest.allTypes["type1"].jobsHandler.sourceType)
+ assertions.Equal(kafkaSource, managerUnderTest.allTypes["type2"].jobsHandler.sourceType)
}
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error())
+ assertions.Equal("missing required job identity: { {{0 0}} type1 }", err.Error())
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error())
+ assertions.Equal("missing required target URI: { job1 {{0 0}} type1 }", err.Error())
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
if req.URL.String() == "http://consumerHost/kafkatarget" {
assertions.Equal(req.Method, "POST")
assertions.Equal(kafkaMessages, getBodyAsString(req, t))
- assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ assertions.Equal("text/plain", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
StatusCode: 200,
assertions.Equal(req.Method, "POST")
assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
messageNo++
- assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ assertions.Equal("text/plain", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
StatusCode: 200,
return nil
})
- jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+ jobUnderTest := newJob(JobInfo{
+ sourceType: kafkaSource,
+ TargetUri: "http://consumerHost/target",
+ }, distributeClientMock)
wg.Add(2)
go jobUnderTest.start()
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
- assertions.Equal("12", getBodyAsString(req, t))
+ assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t))
assertions.Equal("application/json", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
go jobUnderTest.start()
go func() {
- jobUnderTest.messagesChannel <- []byte("1")
- jobUnderTest.messagesChannel <- []byte("2")
+ jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
+ jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
}()
if waitTimeout(&wg, 2*time.Second) {
MaxTimeMiliseconds: 200,
})
- assertions.Equal([]byte("01"), msgs)
+ assertions.Equal([]byte("\"[0,1]\""), msgs)
}
func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
assertions := require.New(t)
MaxTimeMiliseconds: 30,
})
- assertions.Equal([]byte("01"), msgs)
+ assertions.Equal([]byte("\"[0,1]\""), msgs)
}
func fillMessagesBuffer(mc chan []byte) {
log "github.com/sirupsen/logrus"
)
+const ContentTypeJSON = "application/json"
+const ContentTypePlain = "text/plain"
+
// HTTPClient interface
type HTTPClient interface {
Get(url string) (*http.Response, error)
}
func Put(url string, body []byte, client HTTPClient) error {
- return do(http.MethodPut, url, body, client)
+ return do(http.MethodPut, url, body, ContentTypeJSON, client)
}
-func Post(url string, body []byte, client HTTPClient) error {
- return do(http.MethodPost, url, body, client)
+func Post(url string, body []byte, contentType string, client HTTPClient) error {
+ return do(http.MethodPost, url, body, contentType, client)
}
-func do(method string, url string, body []byte, client HTTPClient) error {
+func do(method string, url string, body []byte, contentType string, client HTTPClient) error {
if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
- req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Content-Type", contentType)
if response, respErr := client.Do(req); respErr == nil {
if isResponseSuccess(response.StatusCode) {
return nil
StatusCode: http.StatusOK,
}, nil)
- if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil {
+ if err := Post("http://localhost:9990", []byte("body"), "application/json", &clientMock); err != nil {
t.Errorf("Put() error = %v, did not want error", err)
}
var actualRequest *http.Request
StatusCode: tt.args.mockReturnStatus,
Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
}, tt.args.mockReturnError)
- err := do("PUT", tt.args.url, nil, &clientMock)
+ err := do("PUT", tt.args.url, nil, "", &clientMock)
assertions.Equal(tt.wantErr, err, tt.name)
})
}