}
type JobsManager interface {
- AddJob(JobInfo) error
- DeleteJob(jobId string)
+ AddJobFromRESTCall(JobInfo) error
+ DeleteJobFromRESTCall(jobId string)
}
type JobsManagerImpl struct {
}
}
-func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
+func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
if err := jm.validateJobInfo(ji); err == nil {
typeData := jm.allTypes[ji.InfoTypeIdentity]
typeData.jobsHandler.addJobCh <- ji
}
}
-func (jm *JobsManagerImpl) DeleteJob(jobId string) {
+func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
for _, typeData := range jm.allTypes {
log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
typeData.jobsHandler.deleteJobCh <- jobId
return supportedTypes
}
-func (jm *JobsManagerImpl) StartJobs() {
+func (jm *JobsManagerImpl) StartJobsForAllTypes() {
for _, jobType := range jm.allTypes {
- go jobType.jobsHandler.start(jm.mrAddress)
+ go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
}
}
}
}
-func (jh *jobsHandler) start(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
go func() {
for {
jh.pollAndDistributeMessages(mRAddress)
var err error
go func() {
- err = managerUnderTest.AddJob(wantedJob)
+ err = managerUnderTest.AddJobFromRESTCall(wantedJob)
}()
assertions.Nil(err)
InfoTypeIdentity: "type1",
}
- err := managerUnderTest.AddJob(jobInfo)
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
assertions.Equal("type not supported: type1", err.Error())
}
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
- err := managerUnderTest.AddJob(jobInfo)
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
}
InfoTypeIdentity: "type1",
InfoJobIdentity: "job1",
}
- err := managerUnderTest.AddJob(jobInfo)
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
}
jobsHandler: &jobsHandler,
}
- go managerUnderTest.DeleteJob("job2")
+ go managerUnderTest.DeleteJobFromRESTCall("job2")
assertions.Equal("job2", <-jobsHandler.deleteJobCh)
}
jobsHandler: jobsHandler,
}
- jobsManager.StartJobs()
+ jobsManager.StartJobsForAllTypes()
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
wg.Add(1) // Wait till the distribution has happened
- jobsManager.AddJob(jobInfo)
+ jobsManager.AddJobFromRESTCall(jobInfo)
if waitTimeout(&wg, 2*time.Second) {
t.Error("Not all calls to server were made")
http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
return
}
- if err := h.jobsManager.AddJob(jobInfo); err != nil {
+ if err := h.jobsManager.AddJobFromRESTCall(jobInfo); err != nil {
http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
}
}
return
}
- h.jobsManager.DeleteJob(id)
+ h.jobsManager.DeleteJobFromRESTCall(id)
}
type notFoundHandler struct{}
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
- jobsManager.StartJobs()
+ jobsManager.StartJobsForAllTypes()
log.Debug("Starting DMaaP Mediator Producer")
go func() {
}
// AddJob provides a mock function with given fields: _a0
-func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
+func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
ret := _m.Called(_a0)
var r0 error
}
// DeleteJob provides a mock function with given fields: jobId
-func (_m *JobHandler) DeleteJob(jobId string) {
+func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) {
_m.Called(jobId)
}