Merge "Change of ECS to ICS in test env"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index d4694bf..b6616a1 100644 (file)
@@ -21,9 +21,7 @@
 package jobs
 
 import (
-       "encoding/json"
        "fmt"
-       "os"
        "sync"
 
        log "github.com/sirupsen/logrus"
@@ -47,26 +45,24 @@ type JobInfo struct {
 }
 
 type JobTypesManager interface {
-       LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
+       LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
        GetSupportedTypes() []string
 }
 
 type JobsManager interface {
-       AddJob(JobInfo) error
-       DeleteJob(jobId string)
+       AddJobFromRESTCall(JobInfo) error
+       DeleteJobFromRESTCall(jobId string)
 }
 
 type JobsManagerImpl struct {
-       configFile       string
        allTypes         map[string]TypeData
        pollClient       restclient.HTTPClient
        mrAddress        string
        distributeClient restclient.HTTPClient
 }
 
-func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
-               configFile:       typeConfigFilePath,
                allTypes:         make(map[string]TypeData),
                pollClient:       pollClient,
                mrAddress:        mrAddr,
@@ -74,7 +70,7 @@ func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPCli
        }
 }
 
-func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
+func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
        if err := jm.validateJobInfo(ji); err == nil {
                typeData := jm.allTypes[ji.InfoTypeIdentity]
                typeData.jobsHandler.addJobCh <- ji
@@ -85,7 +81,7 @@ func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
        }
 }
 
-func (jm *JobsManagerImpl) DeleteJob(jobId string) {
+func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
        for _, typeData := range jm.allTypes {
                log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
                typeData.jobsHandler.deleteJobCh <- jobId
@@ -107,26 +103,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
        return nil
 }
 
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
-       typeDefsByte, err := os.ReadFile(jm.configFile)
-       if err != nil {
-               return nil, err
-       }
-       typeDefs := struct {
-               Types []config.TypeDefinition `json:"types"`
-       }{}
-       err = json.Unmarshal(typeDefsByte, &typeDefs)
-       if err != nil {
-               return nil, err
-       }
-       for _, typeDef := range typeDefs.Types {
+func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
+       for _, typeDef := range types {
                jm.allTypes[typeDef.Id] = TypeData{
                        TypeId:        typeDef.Id,
                        DMaaPTopicURL: typeDef.DmaapTopicURL,
                        jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
                }
        }
-       return typeDefs.Types, nil
+       return types
 }
 
 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
@@ -137,10 +122,10 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string {
        return supportedTypes
 }
 
-func (jm *JobsManagerImpl) StartJobs() {
+func (jm *JobsManagerImpl) StartJobsForAllTypes() {
        for _, jobType := range jm.allTypes {
 
-               go jobType.jobsHandler.start(jm.mrAddress)
+               go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
 
        }
 }
@@ -168,7 +153,7 @@ func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPCl
        }
 }
 
-func (jh *jobsHandler) start(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
        go func() {
                for {
                        jh.pollAndDistributeMessages(mRAddress)