Poll MR and send messages to consumers
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "encoding/json"
25         "fmt"
26         "os"
27         "path/filepath"
28         "strings"
29         "sync"
30
31         log "github.com/sirupsen/logrus"
32         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
33 )
34
35 type Type struct {
36         TypeId     string `json:"id"`
37         DMaaPTopic string `json:"dmaapTopic"`
38         Schema     string `json:"schema"`
39         Jobs       map[string]JobInfo
40 }
41
42 type JobInfo struct {
43         Owner            string `json:"owner"`
44         LastUpdated      string `json:"last_updated"`
45         InfoJobIdentity  string `json:"info_job_identity"`
46         TargetUri        string `json:"target_uri"`
47         InfoJobData      string `json:"info_job_data"`
48         InfoTypeIdentity string `json:"info_type_identity"`
49 }
50
51 type JobHandler interface {
52         AddJob(JobInfo) error
53 }
54
55 var (
56         mu      sync.Mutex
57         typeDir = "configs"
58         Handler JobHandler
59         allJobs = make(map[string]Type)
60 )
61
62 func init() {
63         Handler = newJobHandlerImpl()
64 }
65
66 type jobHandlerImpl struct{}
67
68 func newJobHandlerImpl() *jobHandlerImpl {
69         return &jobHandlerImpl{}
70 }
71
72 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
73         mu.Lock()
74         defer mu.Unlock()
75         if err := validateJobInfo(ji); err == nil {
76                 jobs := allJobs[ji.InfoTypeIdentity].Jobs
77                 jobs[ji.InfoJobIdentity] = ji
78                 return nil
79         } else {
80                 return err
81         }
82 }
83
84 func validateJobInfo(ji JobInfo) error {
85         if _, ok := allJobs[ji.InfoTypeIdentity]; !ok {
86                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
87         }
88         if ji.InfoJobIdentity == "" {
89                 return fmt.Errorf("missing required job identity: %v", ji)
90         }
91         // Temporary for when there are only REST callbacks needed
92         if ji.TargetUri == "" {
93                 return fmt.Errorf("missing required target URI: %v", ji)
94         }
95         return nil
96 }
97
98 func GetTypes() ([]*Type, error) {
99         mu.Lock()
100         defer mu.Unlock()
101         types := make([]*Type, 0, 1)
102         err := filepath.Walk(typeDir,
103                 func(path string, info os.FileInfo, err error) error {
104                         if err != nil {
105                                 return err
106                         }
107                         if strings.Contains(path, ".json") {
108                                 if jobType, err := getType(path); err == nil {
109                                         types = append(types, jobType)
110                                 }
111                         }
112                         return nil
113                 })
114         if err != nil {
115                 return nil, err
116         }
117         return types, nil
118 }
119
120 func GetSupportedTypes() []string {
121         mu.Lock()
122         defer mu.Unlock()
123         supportedTypes := []string{}
124         for k := range allJobs {
125                 supportedTypes = append(supportedTypes, k)
126         }
127         return supportedTypes
128 }
129
130 func AddJob(job JobInfo) error {
131         return Handler.AddJob(job)
132 }
133
134 func getType(path string) (*Type, error) {
135         if typeDefinition, err := os.ReadFile(path); err == nil {
136                 var dat map[string]interface{}
137                 if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
138                         schema, _ := json.Marshal(dat["schema"])
139                         typeInfo := Type{
140                                 TypeId:     dat["id"].(string),
141                                 DMaaPTopic: dat["dmaapTopic"].(string),
142                                 Schema:     string(schema),
143                                 Jobs:       make(map[string]JobInfo),
144                         }
145                         if _, ok := allJobs[typeInfo.TypeId]; !ok {
146                                 allJobs[typeInfo.TypeId] = typeInfo
147                         }
148                         return &typeInfo, nil
149                 } else {
150                         return nil, marshalError
151                 }
152         } else {
153                 return nil, err
154         }
155 }
156
157 func RunJobs(mRAddress string) {
158         for {
159                 pollAndDistributeMessages(mRAddress)
160         }
161 }
162
163 func pollAndDistributeMessages(mRAddress string) {
164         for typeId, typeInfo := range allJobs {
165                 log.Debugf("Processing jobs for type: %v", typeId)
166                 messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
167                 if error != nil {
168                         log.Warnf("Error getting data from MR. Cause: %v", error)
169                         continue
170                 }
171                 distributeMessages(messagesBody, typeInfo)
172         }
173 }
174
175 func distributeMessages(messages []byte, typeInfo Type) {
176         if len(messages) > 2 {
177                 mu.Lock()
178                 for _, jobInfo := range typeInfo.Jobs {
179                         go sendMessagesToConsumer(messages, jobInfo)
180                 }
181                 mu.Unlock()
182         }
183 }
184
185 func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
186         log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
187         if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
188                 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
189         }
190 }
191
192 func clearAll() {
193         allJobs = make(map[string]Type)
194 }