NONRTRIC - Enrichment Coordinator Service, Changed error codes
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "encoding/json"
25         "fmt"
26         "os"
27         "sync"
28
29         log "github.com/sirupsen/logrus"
30         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
31 )
32
33 type TypeDefinitions struct {
34         Types []TypeDefinition `json:"types"`
35 }
36 type TypeDefinition struct {
37         Id            string `json:"id"`
38         DmaapTopicURL string `json:"dmaapTopicUrl"`
39 }
40
41 type TypeData struct {
42         TypeId        string `json:"id"`
43         DMaaPTopicURL string `json:"dmaapTopicUrl"`
44         Jobs          map[string]JobInfo
45 }
46
47 type JobInfo struct {
48         Owner            string      `json:"owner"`
49         LastUpdated      string      `json:"last_updated"`
50         InfoJobIdentity  string      `json:"info_job_identity"`
51         TargetUri        string      `json:"target_uri"`
52         InfoJobData      interface{} `json:"info_job_data"`
53         InfoTypeIdentity string      `json:"info_type_identity"`
54 }
55
56 type JobHandler interface {
57         AddJob(JobInfo) error
58         DeleteJob(jobId string)
59 }
60
61 var (
62         mu         sync.Mutex
63         configFile = "configs/type_config.json"
64         Handler    JobHandler
65         allTypes   = make(map[string]TypeData)
66 )
67
68 func init() {
69         Handler = newJobHandlerImpl()
70 }
71
72 type jobHandlerImpl struct{}
73
74 func newJobHandlerImpl() *jobHandlerImpl {
75         return &jobHandlerImpl{}
76 }
77
78 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
79         mu.Lock()
80         defer mu.Unlock()
81         if err := validateJobInfo(ji); err == nil {
82                 jobs := allTypes[ji.InfoTypeIdentity].Jobs
83                 jobs[ji.InfoJobIdentity] = ji
84                 log.Debug("Added job: ", ji)
85                 return nil
86         } else {
87                 return err
88         }
89 }
90
91 func (jh *jobHandlerImpl) DeleteJob(jobId string) {
92         mu.Lock()
93         defer mu.Unlock()
94         for _, typeData := range allTypes {
95                 delete(typeData.Jobs, jobId)
96         }
97         log.Debug("Deleted job: ", jobId)
98 }
99
100 func validateJobInfo(ji JobInfo) error {
101         if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
102                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
103         }
104         if ji.InfoJobIdentity == "" {
105                 return fmt.Errorf("missing required job identity: %v", ji)
106         }
107         // Temporary for when there are only REST callbacks needed
108         if ji.TargetUri == "" {
109                 return fmt.Errorf("missing required target URI: %v", ji)
110         }
111         return nil
112 }
113
114 func GetTypes() ([]TypeData, error) {
115         mu.Lock()
116         defer mu.Unlock()
117         types := make([]TypeData, 0, 1)
118         typeDefsByte, err := os.ReadFile(configFile)
119         if err != nil {
120                 return nil, err
121         }
122         typeDefs := TypeDefinitions{}
123         err = json.Unmarshal(typeDefsByte, &typeDefs)
124         if err != nil {
125                 return nil, err
126         }
127         for _, typeDef := range typeDefs.Types {
128                 typeInfo := TypeData{
129                         TypeId:        typeDef.Id,
130                         DMaaPTopicURL: typeDef.DmaapTopicURL,
131                         Jobs:          make(map[string]JobInfo),
132                 }
133                 if _, ok := allTypes[typeInfo.TypeId]; !ok {
134                         allTypes[typeInfo.TypeId] = typeInfo
135                 }
136                 types = append(types, typeInfo)
137         }
138         return types, nil
139 }
140
141 func GetSupportedTypes() []string {
142         mu.Lock()
143         defer mu.Unlock()
144         supportedTypes := []string{}
145         for k := range allTypes {
146                 supportedTypes = append(supportedTypes, k)
147         }
148         return supportedTypes
149 }
150
151 func AddJob(job JobInfo) error {
152         return Handler.AddJob(job)
153 }
154
155 func DeleteJob(jobId string) {
156         Handler.DeleteJob(jobId)
157 }
158
159 func RunJobs(mRAddress string) {
160         for {
161                 pollAndDistributeMessages(mRAddress)
162         }
163 }
164
165 func pollAndDistributeMessages(mRAddress string) {
166         for typeId, typeInfo := range allTypes {
167                 log.Debugf("Processing jobs for type: %v", typeId)
168                 messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
169                 if error != nil {
170                         log.Warnf("Error getting data from MR. Cause: %v", error)
171                         continue
172                 }
173                 distributeMessages(messagesBody, typeInfo)
174         }
175 }
176
177 func distributeMessages(messages []byte, typeInfo TypeData) {
178         if len(messages) > 2 {
179                 mu.Lock()
180                 for _, jobInfo := range typeInfo.Jobs {
181                         go sendMessagesToConsumer(messages, jobInfo)
182                 }
183                 mu.Unlock()
184         }
185 }
186
187 func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
188         log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
189         if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
190                 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
191         }
192 }
193
194 func clearAll() {
195         allTypes = make(map[string]TypeData)
196 }