Merge "Use env varialbes to replace image urls & tags"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "encoding/json"
25         "fmt"
26         "os"
27         "sync"
28
29         log "github.com/sirupsen/logrus"
30         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
32 )
33
34 type TypeData struct {
35         TypeId        string `json:"id"`
36         DMaaPTopicURL string `json:"dmaapTopicUrl"`
37         jobsHandler   *jobsHandler
38 }
39
40 type JobInfo struct {
41         Owner            string      `json:"owner"`
42         LastUpdated      string      `json:"last_updated"`
43         InfoJobIdentity  string      `json:"info_job_identity"`
44         TargetUri        string      `json:"target_uri"`
45         InfoJobData      interface{} `json:"info_job_data"`
46         InfoTypeIdentity string      `json:"info_type_identity"`
47 }
48
49 type JobTypesManager interface {
50         LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
51         GetSupportedTypes() []string
52 }
53
54 type JobsManager interface {
55         AddJobFromRESTCall(JobInfo) error
56         DeleteJobFromRESTCall(jobId string)
57 }
58
59 type JobsManagerImpl struct {
60         configFile       string
61         allTypes         map[string]TypeData
62         pollClient       restclient.HTTPClient
63         mrAddress        string
64         distributeClient restclient.HTTPClient
65 }
66
67 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
68         return &JobsManagerImpl{
69                 configFile:       typeConfigFilePath,
70                 allTypes:         make(map[string]TypeData),
71                 pollClient:       pollClient,
72                 mrAddress:        mrAddr,
73                 distributeClient: distributeClient,
74         }
75 }
76
77 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
78         if err := jm.validateJobInfo(ji); err == nil {
79                 typeData := jm.allTypes[ji.InfoTypeIdentity]
80                 typeData.jobsHandler.addJobCh <- ji
81                 log.Debug("Added job: ", ji)
82                 return nil
83         } else {
84                 return err
85         }
86 }
87
88 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
89         for _, typeData := range jm.allTypes {
90                 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
91                 typeData.jobsHandler.deleteJobCh <- jobId
92         }
93         log.Debug("Deleted job: ", jobId)
94 }
95
96 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
97         if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
98                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
99         }
100         if ji.InfoJobIdentity == "" {
101                 return fmt.Errorf("missing required job identity: %v", ji)
102         }
103         // Temporary for when there are only REST callbacks needed
104         if ji.TargetUri == "" {
105                 return fmt.Errorf("missing required target URI: %v", ji)
106         }
107         return nil
108 }
109
110 func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
111         typeDefsByte, err := os.ReadFile(jm.configFile)
112         if err != nil {
113                 return nil, err
114         }
115         typeDefs := struct {
116                 Types []config.TypeDefinition `json:"types"`
117         }{}
118         err = json.Unmarshal(typeDefsByte, &typeDefs)
119         if err != nil {
120                 return nil, err
121         }
122         for _, typeDef := range typeDefs.Types {
123                 jm.allTypes[typeDef.Id] = TypeData{
124                         TypeId:        typeDef.Id,
125                         DMaaPTopicURL: typeDef.DmaapTopicURL,
126                         jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
127                 }
128         }
129         return typeDefs.Types, nil
130 }
131
132 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
133         supportedTypes := []string{}
134         for k := range jm.allTypes {
135                 supportedTypes = append(supportedTypes, k)
136         }
137         return supportedTypes
138 }
139
140 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
141         for _, jobType := range jm.allTypes {
142
143                 go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
144
145         }
146 }
147
148 type jobsHandler struct {
149         mu               sync.Mutex
150         typeId           string
151         topicUrl         string
152         jobs             map[string]job
153         addJobCh         chan JobInfo
154         deleteJobCh      chan string
155         pollClient       restclient.HTTPClient
156         distributeClient restclient.HTTPClient
157 }
158
159 func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
160         return &jobsHandler{
161                 typeId:           typeId,
162                 topicUrl:         topicURL,
163                 jobs:             make(map[string]job),
164                 addJobCh:         make(chan JobInfo),
165                 deleteJobCh:      make(chan string),
166                 pollClient:       pollClient,
167                 distributeClient: distributeClient,
168         }
169 }
170
171 func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
172         go func() {
173                 for {
174                         jh.pollAndDistributeMessages(mRAddress)
175                 }
176         }()
177
178         go func() {
179                 for {
180                         jh.monitorManagementChannels()
181                 }
182         }()
183 }
184
185 func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
186         log.Debugf("Processing jobs for type: %v", jh.typeId)
187         messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
188         if error != nil {
189                 log.Warn("Error getting data from MR. Cause: ", error)
190         }
191         log.Debug("Received messages: ", string(messagesBody))
192         jh.distributeMessages(messagesBody)
193 }
194
195 func (jh *jobsHandler) distributeMessages(messages []byte) {
196         if len(messages) > 2 {
197                 jh.mu.Lock()
198                 defer jh.mu.Unlock()
199                 for _, job := range jh.jobs {
200                         if len(job.messagesChannel) < cap(job.messagesChannel) {
201                                 job.messagesChannel <- messages
202                         } else {
203                                 jh.emptyMessagesBuffer(job)
204                         }
205                 }
206         }
207 }
208
209 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
210         log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
211 out:
212         for {
213                 select {
214                 case <-job.messagesChannel:
215                 default:
216                         break out
217                 }
218         }
219 }
220
221 func (jh *jobsHandler) monitorManagementChannels() {
222         select {
223         case addedJob := <-jh.addJobCh:
224                 jh.addJob(addedJob)
225         case deletedJob := <-jh.deleteJobCh:
226                 jh.deleteJob(deletedJob)
227         }
228 }
229
230 func (jh *jobsHandler) addJob(addedJob JobInfo) {
231         jh.mu.Lock()
232         log.Debug("Add job: ", addedJob)
233         newJob := newJob(addedJob, jh.distributeClient)
234         go newJob.start()
235         jh.jobs[addedJob.InfoJobIdentity] = newJob
236         jh.mu.Unlock()
237 }
238
239 func (jh *jobsHandler) deleteJob(deletedJob string) {
240         jh.mu.Lock()
241         log.Debug("Delete job: ", deletedJob)
242         j, exist := jh.jobs[deletedJob]
243         if exist {
244                 j.controlChannel <- struct{}{}
245                 delete(jh.jobs, deletedJob)
246         }
247         jh.mu.Unlock()
248 }
249
250 type job struct {
251         jobInfo         JobInfo
252         client          restclient.HTTPClient
253         messagesChannel chan []byte
254         controlChannel  chan struct{}
255 }
256
257 func newJob(j JobInfo, c restclient.HTTPClient) job {
258         return job{
259                 jobInfo:         j,
260                 client:          c,
261                 messagesChannel: make(chan []byte, 10),
262                 controlChannel:  make(chan struct{}),
263         }
264 }
265
266 func (j *job) start() {
267 out:
268         for {
269                 select {
270                 case <-j.controlChannel:
271                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
272                         break out
273                 case msg := <-j.messagesChannel:
274                         j.sendMessagesToConsumer(msg)
275                 }
276         }
277 }
278
279 func (j *job) sendMessagesToConsumer(messages []byte) {
280         log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
281         if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
282                 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
283         }
284         log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
285 }