Poll MR and send messages to consumers
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
index 7347178..eec59c3 100644 (file)
 package jobs
 
 import (
+       "encoding/json"
        "fmt"
        "os"
        "path/filepath"
        "strings"
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 type Type struct {
-       TypeId string
-       Schema string
+       TypeId     string `json:"id"`
+       DMaaPTopic string `json:"dmaapTopic"`
+       Schema     string `json:"schema"`
+       Jobs       map[string]JobInfo
 }
 
 type JobInfo struct {
@@ -46,9 +53,10 @@ type JobHandler interface {
 }
 
 var (
+       mu      sync.Mutex
        typeDir = "configs"
        Handler JobHandler
-       allJobs = make(map[string]map[string]JobInfo)
+       allJobs = make(map[string]Type)
 )
 
 func init() {
@@ -62,8 +70,10 @@ func newJobHandlerImpl() *jobHandlerImpl {
 }
 
 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
+       mu.Lock()
+       defer mu.Unlock()
        if err := validateJobInfo(ji); err == nil {
-               jobs := allJobs[ji.InfoTypeIdentity]
+               jobs := allJobs[ji.InfoTypeIdentity].Jobs
                jobs[ji.InfoJobIdentity] = ji
                return nil
        } else {
@@ -86,6 +96,8 @@ func validateJobInfo(ji JobInfo) error {
 }
 
 func GetTypes() ([]*Type, error) {
+       mu.Lock()
+       defer mu.Unlock()
        types := make([]*Type, 0, 1)
        err := filepath.Walk(typeDir,
                func(path string, info os.FileInfo, err error) error {
@@ -106,6 +118,8 @@ func GetTypes() ([]*Type, error) {
 }
 
 func GetSupportedTypes() []string {
+       mu.Lock()
+       defer mu.Unlock()
        supportedTypes := []string{}
        for k := range allJobs {
                supportedTypes = append(supportedTypes, k)
@@ -118,23 +132,63 @@ func AddJob(job JobInfo) error {
 }
 
 func getType(path string) (*Type, error) {
-       fileName := filepath.Base(path)
-       typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
-
-       if typeSchema, err := os.ReadFile(path); err == nil {
-               typeInfo := Type{
-                       TypeId: typeName,
-                       Schema: string(typeSchema),
-               }
-               if _, ok := allJobs[typeName]; !ok {
-                       allJobs[typeName] = make(map[string]JobInfo)
+       if typeDefinition, err := os.ReadFile(path); err == nil {
+               var dat map[string]interface{}
+               if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
+                       schema, _ := json.Marshal(dat["schema"])
+                       typeInfo := Type{
+                               TypeId:     dat["id"].(string),
+                               DMaaPTopic: dat["dmaapTopic"].(string),
+                               Schema:     string(schema),
+                               Jobs:       make(map[string]JobInfo),
+                       }
+                       if _, ok := allJobs[typeInfo.TypeId]; !ok {
+                               allJobs[typeInfo.TypeId] = typeInfo
+                       }
+                       return &typeInfo, nil
+               } else {
+                       return nil, marshalError
                }
-               return &typeInfo, nil
        } else {
                return nil, err
        }
 }
 
+func RunJobs(mRAddress string) {
+       for {
+               pollAndDistributeMessages(mRAddress)
+       }
+}
+
+func pollAndDistributeMessages(mRAddress string) {
+       for typeId, typeInfo := range allJobs {
+               log.Debugf("Processing jobs for type: %v", typeId)
+               messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+               if error != nil {
+                       log.Warnf("Error getting data from MR. Cause: %v", error)
+                       continue
+               }
+               distributeMessages(messagesBody, typeInfo)
+       }
+}
+
+func distributeMessages(messages []byte, typeInfo Type) {
+       if len(messages) > 2 {
+               mu.Lock()
+               for _, jobInfo := range typeInfo.Jobs {
+                       go sendMessagesToConsumer(messages, jobInfo)
+               }
+               mu.Unlock()
+       }
+}
+
+func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+       log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
+       if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+               log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+       }
+}
+
 func clearAll() {
-       allJobs = make(map[string]map[string]JobInfo)
+       allJobs = make(map[string]Type)
 }