Code Review
/
nonrtric.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Callback to delete job
[nonrtric.git]
/
dmaap-mediator-producer
/
internal
/
jobs
/
jobs.go
diff --git
a/dmaap-mediator-producer/internal/jobs/jobs.go
b/dmaap-mediator-producer/internal/jobs/jobs.go
index
09d3891
..
e5a1070
100644
(file)
--- a/
dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/
dmaap-mediator-producer/internal/jobs/jobs.go
@@
-55,6
+55,7
@@
type JobInfo struct {
type JobHandler interface {
AddJob(JobInfo) error
type JobHandler interface {
AddJob(JobInfo) error
+ DeleteJob(jobId string)
}
var (
}
var (
@@
-87,6
+88,15
@@
func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
}
}
}
}
+func (jh *jobHandlerImpl) DeleteJob(jobId string) {
+ mu.Lock()
+ defer mu.Unlock()
+ for _, typeData := range allTypes {
+ delete(typeData.Jobs, jobId)
+ }
+ log.Debug("Deleted job: ", jobId)
+}
+
func validateJobInfo(ji JobInfo) error {
if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
func validateJobInfo(ji JobInfo) error {
if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
@@
-142,6
+152,10
@@
func AddJob(job JobInfo) error {
return Handler.AddJob(job)
}
return Handler.AddJob(job)
}
+func DeleteJob(jobId string) {
+ Handler.DeleteJob(jobId)
+}
+
func RunJobs(mRAddress string) {
for {
pollAndDistributeMessages(mRAddress)
func RunJobs(mRAddress string) {
for {
pollAndDistributeMessages(mRAddress)