X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;h=867894f76f279ea073f4304cfc254e12db6c1297;hb=9fea4bb800369d2b56b81b315451314fa4482f8c;hp=d4694bf23417c07c739c1ec6705dbc50fd2c9965;hpb=c873fa9306739a14d05454d2ec27bc2fde497058;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index d4694bf2..867894f7 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -21,10 +21,9 @@ package jobs import ( - "encoding/json" "fmt" - "os" "sync" + "time" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" @@ -47,26 +46,24 @@ type JobInfo struct { } type JobTypesManager interface { - LoadTypesFromConfiguration() ([]config.TypeDefinition, error) + LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition GetSupportedTypes() []string } type JobsManager interface { - AddJob(JobInfo) error - DeleteJob(jobId string) + AddJobFromRESTCall(JobInfo) error + DeleteJobFromRESTCall(jobId string) } type JobsManagerImpl struct { - configFile string allTypes map[string]TypeData pollClient restclient.HTTPClient mrAddress string distributeClient restclient.HTTPClient } -func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { +func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { return &JobsManagerImpl{ - configFile: typeConfigFilePath, allTypes: make(map[string]TypeData), pollClient: pollClient, mrAddress: mrAddr, @@ -74,7 +71,7 @@ func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPCli } } -func (jm *JobsManagerImpl) AddJob(ji JobInfo) error { +func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error { if err := jm.validateJobInfo(ji); err == nil { typeData := jm.allTypes[ji.InfoTypeIdentity] typeData.jobsHandler.addJobCh <- ji @@ -85,7 +82,7 @@ func (jm *JobsManagerImpl) AddJob(ji JobInfo) error { } } -func (jm *JobsManagerImpl) DeleteJob(jobId string) { +func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) { for _, typeData := range jm.allTypes { log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId) typeData.jobsHandler.deleteJobCh <- jobId @@ -107,26 +104,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { return nil } -func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(jm.configFile) - if err != nil { - return nil, err - } - typeDefs := struct { - Types []config.TypeDefinition `json:"types"` - }{} - err = json.Unmarshal(typeDefsByte, &typeDefs) - if err != nil { - return nil, err - } - for _, typeDef := range typeDefs.Types { +func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { + for _, typeDef := range types { jm.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient), } } - return typeDefs.Types, nil + return types } func (jm *JobsManagerImpl) GetSupportedTypes() []string { @@ -137,10 +123,10 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string { return supportedTypes } -func (jm *JobsManagerImpl) StartJobs() { +func (jm *JobsManagerImpl) StartJobsForAllTypes() { for _, jobType := range jm.allTypes { - go jobType.jobsHandler.start(jm.mrAddress) + go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress) } } @@ -168,7 +154,7 @@ func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPCl } } -func (jh *jobsHandler) start(mRAddress string) { +func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) { go func() { for { jh.pollAndDistributeMessages(mRAddress) @@ -187,6 +173,7 @@ func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) { messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient) if error != nil { log.Warn("Error getting data from MR. Cause: ", error) + time.Sleep(time.Minute) // Must wait before trying to call MR again } log.Debug("Received messages: ", string(messagesBody)) jh.distributeMessages(messagesBody)