Merge "ODU slice assurance usecase - Improvements"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index 6dad5fd..867894f 100644 (file)
 package jobs
 
 import (
-       "encoding/json"
        "fmt"
-       "os"
        "sync"
+       "time"
 
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
@@ -47,7 +46,7 @@ type JobInfo struct {
 }
 
 type JobTypesManager interface {
-       LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
+       LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
        GetSupportedTypes() []string
 }
 
@@ -57,16 +56,14 @@ type JobsManager interface {
 }
 
 type JobsManagerImpl struct {
-       configFile       string
        allTypes         map[string]TypeData
        pollClient       restclient.HTTPClient
        mrAddress        string
        distributeClient restclient.HTTPClient
 }
 
-func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
-               configFile:       typeConfigFilePath,
                allTypes:         make(map[string]TypeData),
                pollClient:       pollClient,
                mrAddress:        mrAddr,
@@ -107,26 +104,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
        return nil
 }
 
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
-       typeDefsByte, err := os.ReadFile(jm.configFile)
-       if err != nil {
-               return nil, err
-       }
-       typeDefs := struct {
-               Types []config.TypeDefinition `json:"types"`
-       }{}
-       err = json.Unmarshal(typeDefsByte, &typeDefs)
-       if err != nil {
-               return nil, err
-       }
-       for _, typeDef := range typeDefs.Types {
+func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
+       for _, typeDef := range types {
                jm.allTypes[typeDef.Id] = TypeData{
                        TypeId:        typeDef.Id,
                        DMaaPTopicURL: typeDef.DmaapTopicURL,
                        jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
                }
        }
-       return typeDefs.Types, nil
+       return types
 }
 
 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
@@ -187,6 +173,7 @@ func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
        messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
        if error != nil {
                log.Warn("Error getting data from MR. Cause: ", error)
+               time.Sleep(time.Minute) // Must wait before trying to call MR again
        }
        log.Debug("Received messages: ", string(messagesBody))
        jh.distributeMessages(messagesBody)