Merge "First version of ODU slice assurance usecase"
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "fmt"
25         "sync"
26
27         log "github.com/sirupsen/logrus"
28         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
29         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
30 )
31
32 type TypeData struct {
33         TypeId        string `json:"id"`
34         DMaaPTopicURL string `json:"dmaapTopicUrl"`
35         jobsHandler   *jobsHandler
36 }
37
38 type JobInfo struct {
39         Owner            string      `json:"owner"`
40         LastUpdated      string      `json:"last_updated"`
41         InfoJobIdentity  string      `json:"info_job_identity"`
42         TargetUri        string      `json:"target_uri"`
43         InfoJobData      interface{} `json:"info_job_data"`
44         InfoTypeIdentity string      `json:"info_type_identity"`
45 }
46
47 type JobTypesManager interface {
48         LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
49         GetSupportedTypes() []string
50 }
51
52 type JobsManager interface {
53         AddJobFromRESTCall(JobInfo) error
54         DeleteJobFromRESTCall(jobId string)
55 }
56
57 type JobsManagerImpl struct {
58         allTypes         map[string]TypeData
59         pollClient       restclient.HTTPClient
60         mrAddress        string
61         distributeClient restclient.HTTPClient
62 }
63
64 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
65         return &JobsManagerImpl{
66                 allTypes:         make(map[string]TypeData),
67                 pollClient:       pollClient,
68                 mrAddress:        mrAddr,
69                 distributeClient: distributeClient,
70         }
71 }
72
73 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
74         if err := jm.validateJobInfo(ji); err == nil {
75                 typeData := jm.allTypes[ji.InfoTypeIdentity]
76                 typeData.jobsHandler.addJobCh <- ji
77                 log.Debug("Added job: ", ji)
78                 return nil
79         } else {
80                 return err
81         }
82 }
83
84 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
85         for _, typeData := range jm.allTypes {
86                 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
87                 typeData.jobsHandler.deleteJobCh <- jobId
88         }
89         log.Debug("Deleted job: ", jobId)
90 }
91
92 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
93         if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
94                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
95         }
96         if ji.InfoJobIdentity == "" {
97                 return fmt.Errorf("missing required job identity: %v", ji)
98         }
99         // Temporary for when there are only REST callbacks needed
100         if ji.TargetUri == "" {
101                 return fmt.Errorf("missing required target URI: %v", ji)
102         }
103         return nil
104 }
105
106 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
107         for _, typeDef := range types {
108                 jm.allTypes[typeDef.Id] = TypeData{
109                         TypeId:        typeDef.Id,
110                         DMaaPTopicURL: typeDef.DmaapTopicURL,
111                         jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
112                 }
113         }
114         return types
115 }
116
117 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
118         supportedTypes := []string{}
119         for k := range jm.allTypes {
120                 supportedTypes = append(supportedTypes, k)
121         }
122         return supportedTypes
123 }
124
125 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
126         for _, jobType := range jm.allTypes {
127
128                 go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
129
130         }
131 }
132
133 type jobsHandler struct {
134         mu               sync.Mutex
135         typeId           string
136         topicUrl         string
137         jobs             map[string]job
138         addJobCh         chan JobInfo
139         deleteJobCh      chan string
140         pollClient       restclient.HTTPClient
141         distributeClient restclient.HTTPClient
142 }
143
144 func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
145         return &jobsHandler{
146                 typeId:           typeId,
147                 topicUrl:         topicURL,
148                 jobs:             make(map[string]job),
149                 addJobCh:         make(chan JobInfo),
150                 deleteJobCh:      make(chan string),
151                 pollClient:       pollClient,
152                 distributeClient: distributeClient,
153         }
154 }
155
156 func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
157         go func() {
158                 for {
159                         jh.pollAndDistributeMessages(mRAddress)
160                 }
161         }()
162
163         go func() {
164                 for {
165                         jh.monitorManagementChannels()
166                 }
167         }()
168 }
169
170 func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
171         log.Debugf("Processing jobs for type: %v", jh.typeId)
172         messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
173         if error != nil {
174                 log.Warn("Error getting data from MR. Cause: ", error)
175         }
176         log.Debug("Received messages: ", string(messagesBody))
177         jh.distributeMessages(messagesBody)
178 }
179
180 func (jh *jobsHandler) distributeMessages(messages []byte) {
181         if len(messages) > 2 {
182                 jh.mu.Lock()
183                 defer jh.mu.Unlock()
184                 for _, job := range jh.jobs {
185                         if len(job.messagesChannel) < cap(job.messagesChannel) {
186                                 job.messagesChannel <- messages
187                         } else {
188                                 jh.emptyMessagesBuffer(job)
189                         }
190                 }
191         }
192 }
193
194 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
195         log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
196 out:
197         for {
198                 select {
199                 case <-job.messagesChannel:
200                 default:
201                         break out
202                 }
203         }
204 }
205
206 func (jh *jobsHandler) monitorManagementChannels() {
207         select {
208         case addedJob := <-jh.addJobCh:
209                 jh.addJob(addedJob)
210         case deletedJob := <-jh.deleteJobCh:
211                 jh.deleteJob(deletedJob)
212         }
213 }
214
215 func (jh *jobsHandler) addJob(addedJob JobInfo) {
216         jh.mu.Lock()
217         log.Debug("Add job: ", addedJob)
218         newJob := newJob(addedJob, jh.distributeClient)
219         go newJob.start()
220         jh.jobs[addedJob.InfoJobIdentity] = newJob
221         jh.mu.Unlock()
222 }
223
224 func (jh *jobsHandler) deleteJob(deletedJob string) {
225         jh.mu.Lock()
226         log.Debug("Delete job: ", deletedJob)
227         j, exist := jh.jobs[deletedJob]
228         if exist {
229                 j.controlChannel <- struct{}{}
230                 delete(jh.jobs, deletedJob)
231         }
232         jh.mu.Unlock()
233 }
234
235 type job struct {
236         jobInfo         JobInfo
237         client          restclient.HTTPClient
238         messagesChannel chan []byte
239         controlChannel  chan struct{}
240 }
241
242 func newJob(j JobInfo, c restclient.HTTPClient) job {
243         return job{
244                 jobInfo:         j,
245                 client:          c,
246                 messagesChannel: make(chan []byte, 10),
247                 controlChannel:  make(chan struct{}),
248         }
249 }
250
251 func (j *job) start() {
252 out:
253         for {
254                 select {
255                 case <-j.controlChannel:
256                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
257                         break out
258                 case msg := <-j.messagesChannel:
259                         j.sendMessagesToConsumer(msg)
260                 }
261         }
262 }
263
264 func (j *job) sendMessagesToConsumer(messages []byte) {
265         log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
266         if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
267                 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
268         }
269         log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
270 }