X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;h=867894f76f279ea073f4304cfc254e12db6c1297;hb=ba93486b43fb1cda492ba27ae4637a6e0160ae66;hp=6dad5fd9769f8da8adca72d5dabbca518ad76c8e;hpb=46a0fd717e5f49ebae6cb2c4fbcf54f0e329dc86;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 6dad5fd9..867894f7 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -21,10 +21,9 @@ package jobs import ( - "encoding/json" "fmt" - "os" "sync" + "time" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" @@ -47,7 +46,7 @@ type JobInfo struct { } type JobTypesManager interface { - LoadTypesFromConfiguration() ([]config.TypeDefinition, error) + LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition GetSupportedTypes() []string } @@ -57,16 +56,14 @@ type JobsManager interface { } type JobsManagerImpl struct { - configFile string allTypes map[string]TypeData pollClient restclient.HTTPClient mrAddress string distributeClient restclient.HTTPClient } -func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { +func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { return &JobsManagerImpl{ - configFile: typeConfigFilePath, allTypes: make(map[string]TypeData), pollClient: pollClient, mrAddress: mrAddr, @@ -107,26 +104,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { return nil } -func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(jm.configFile) - if err != nil { - return nil, err - } - typeDefs := struct { - Types []config.TypeDefinition `json:"types"` - }{} - err = json.Unmarshal(typeDefsByte, &typeDefs) - if err != nil { - return nil, err - } - for _, typeDef := range typeDefs.Types { +func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { + for _, typeDef := range types { jm.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient), } } - return typeDefs.Types, nil + return types } func (jm *JobsManagerImpl) GetSupportedTypes() []string { @@ -187,6 +173,7 @@ func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) { messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient) if error != nil { log.Warn("Error getting data from MR. Cause: ", error) + time.Sleep(time.Minute) // Must wait before trying to call MR again } log.Debug("Received messages: ", string(messagesBody)) jh.distributeMessages(messagesBody)