Merge "Add docker-compose files for dmaap-mediator"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "encoding/json"
25         "fmt"
26         "os"
27         "sync"
28
29         log "github.com/sirupsen/logrus"
30         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
31 )
32
33 type TypeDefinitions struct {
34         Types []TypeDefinition `json:"types"`
35 }
36 type TypeDefinition struct {
37         Id            string `json:"id"`
38         DmaapTopicURL string `json:"dmaapTopicUrl"`
39 }
40
41 type TypeData struct {
42         TypeId        string `json:"id"`
43         DMaaPTopicURL string `json:"dmaapTopicUrl"`
44         Jobs          map[string]JobInfo
45 }
46
47 type JobInfo struct {
48         Owner            string      `json:"owner"`
49         LastUpdated      string      `json:"last_updated"`
50         InfoJobIdentity  string      `json:"info_job_identity"`
51         TargetUri        string      `json:"target_uri"`
52         InfoJobData      interface{} `json:"info_job_data"`
53         InfoTypeIdentity string      `json:"info_type_identity"`
54 }
55
56 type JobHandler interface {
57         AddJob(JobInfo) error
58 }
59
60 var (
61         mu         sync.Mutex
62         configFile = "configs/type_config.json"
63         Handler    JobHandler
64         allTypes   = make(map[string]TypeData)
65 )
66
67 func init() {
68         Handler = newJobHandlerImpl()
69 }
70
71 type jobHandlerImpl struct{}
72
73 func newJobHandlerImpl() *jobHandlerImpl {
74         return &jobHandlerImpl{}
75 }
76
77 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
78         mu.Lock()
79         defer mu.Unlock()
80         if err := validateJobInfo(ji); err == nil {
81                 jobs := allTypes[ji.InfoTypeIdentity].Jobs
82                 jobs[ji.InfoJobIdentity] = ji
83                 log.Debug("Added job: ", ji)
84                 return nil
85         } else {
86                 return err
87         }
88 }
89
90 func validateJobInfo(ji JobInfo) error {
91         if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
92                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
93         }
94         if ji.InfoJobIdentity == "" {
95                 return fmt.Errorf("missing required job identity: %v", ji)
96         }
97         // Temporary for when there are only REST callbacks needed
98         if ji.TargetUri == "" {
99                 return fmt.Errorf("missing required target URI: %v", ji)
100         }
101         return nil
102 }
103
104 func GetTypes() ([]TypeData, error) {
105         mu.Lock()
106         defer mu.Unlock()
107         types := make([]TypeData, 0, 1)
108         typeDefsByte, err := os.ReadFile(configFile)
109         if err != nil {
110                 return nil, err
111         }
112         typeDefs := TypeDefinitions{}
113         err = json.Unmarshal(typeDefsByte, &typeDefs)
114         if err != nil {
115                 return nil, err
116         }
117         for _, typeDef := range typeDefs.Types {
118                 typeInfo := TypeData{
119                         TypeId:        typeDef.Id,
120                         DMaaPTopicURL: typeDef.DmaapTopicURL,
121                         Jobs:          make(map[string]JobInfo),
122                 }
123                 if _, ok := allTypes[typeInfo.TypeId]; !ok {
124                         allTypes[typeInfo.TypeId] = typeInfo
125                 }
126                 types = append(types, typeInfo)
127         }
128         return types, nil
129 }
130
131 func GetSupportedTypes() []string {
132         mu.Lock()
133         defer mu.Unlock()
134         supportedTypes := []string{}
135         for k := range allTypes {
136                 supportedTypes = append(supportedTypes, k)
137         }
138         return supportedTypes
139 }
140
141 func AddJob(job JobInfo) error {
142         return Handler.AddJob(job)
143 }
144
145 func RunJobs(mRAddress string) {
146         for {
147                 pollAndDistributeMessages(mRAddress)
148         }
149 }
150
151 func pollAndDistributeMessages(mRAddress string) {
152         for typeId, typeInfo := range allTypes {
153                 log.Debugf("Processing jobs for type: %v", typeId)
154                 messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
155                 if error != nil {
156                         log.Warnf("Error getting data from MR. Cause: %v", error)
157                         continue
158                 }
159                 distributeMessages(messagesBody, typeInfo)
160         }
161 }
162
163 func distributeMessages(messages []byte, typeInfo TypeData) {
164         if len(messages) > 2 {
165                 mu.Lock()
166                 for _, jobInfo := range typeInfo.Jobs {
167                         go sendMessagesToConsumer(messages, jobInfo)
168                 }
169                 mu.Unlock()
170         }
171 }
172
173 func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
174         log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
175         if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
176                 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
177         }
178 }
179
180 func clearAll() {
181         allTypes = make(map[string]TypeData)
182 }