867894f76f279ea073f4304cfc254e12db6c1297
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "fmt"
25         "sync"
26         "time"
27
28         log "github.com/sirupsen/logrus"
29         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
30         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
31 )
32
33 type TypeData struct {
34         TypeId        string `json:"id"`
35         DMaaPTopicURL string `json:"dmaapTopicUrl"`
36         jobsHandler   *jobsHandler
37 }
38
39 type JobInfo struct {
40         Owner            string      `json:"owner"`
41         LastUpdated      string      `json:"last_updated"`
42         InfoJobIdentity  string      `json:"info_job_identity"`
43         TargetUri        string      `json:"target_uri"`
44         InfoJobData      interface{} `json:"info_job_data"`
45         InfoTypeIdentity string      `json:"info_type_identity"`
46 }
47
48 type JobTypesManager interface {
49         LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
50         GetSupportedTypes() []string
51 }
52
53 type JobsManager interface {
54         AddJobFromRESTCall(JobInfo) error
55         DeleteJobFromRESTCall(jobId string)
56 }
57
58 type JobsManagerImpl struct {
59         allTypes         map[string]TypeData
60         pollClient       restclient.HTTPClient
61         mrAddress        string
62         distributeClient restclient.HTTPClient
63 }
64
65 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
66         return &JobsManagerImpl{
67                 allTypes:         make(map[string]TypeData),
68                 pollClient:       pollClient,
69                 mrAddress:        mrAddr,
70                 distributeClient: distributeClient,
71         }
72 }
73
74 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
75         if err := jm.validateJobInfo(ji); err == nil {
76                 typeData := jm.allTypes[ji.InfoTypeIdentity]
77                 typeData.jobsHandler.addJobCh <- ji
78                 log.Debug("Added job: ", ji)
79                 return nil
80         } else {
81                 return err
82         }
83 }
84
85 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
86         for _, typeData := range jm.allTypes {
87                 log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
88                 typeData.jobsHandler.deleteJobCh <- jobId
89         }
90         log.Debug("Deleted job: ", jobId)
91 }
92
93 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
94         if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
95                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
96         }
97         if ji.InfoJobIdentity == "" {
98                 return fmt.Errorf("missing required job identity: %v", ji)
99         }
100         // Temporary for when there are only REST callbacks needed
101         if ji.TargetUri == "" {
102                 return fmt.Errorf("missing required target URI: %v", ji)
103         }
104         return nil
105 }
106
107 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
108         for _, typeDef := range types {
109                 jm.allTypes[typeDef.Id] = TypeData{
110                         TypeId:        typeDef.Id,
111                         DMaaPTopicURL: typeDef.DmaapTopicURL,
112                         jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
113                 }
114         }
115         return types
116 }
117
118 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
119         supportedTypes := []string{}
120         for k := range jm.allTypes {
121                 supportedTypes = append(supportedTypes, k)
122         }
123         return supportedTypes
124 }
125
126 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
127         for _, jobType := range jm.allTypes {
128
129                 go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
130
131         }
132 }
133
134 type jobsHandler struct {
135         mu               sync.Mutex
136         typeId           string
137         topicUrl         string
138         jobs             map[string]job
139         addJobCh         chan JobInfo
140         deleteJobCh      chan string
141         pollClient       restclient.HTTPClient
142         distributeClient restclient.HTTPClient
143 }
144
145 func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
146         return &jobsHandler{
147                 typeId:           typeId,
148                 topicUrl:         topicURL,
149                 jobs:             make(map[string]job),
150                 addJobCh:         make(chan JobInfo),
151                 deleteJobCh:      make(chan string),
152                 pollClient:       pollClient,
153                 distributeClient: distributeClient,
154         }
155 }
156
157 func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
158         go func() {
159                 for {
160                         jh.pollAndDistributeMessages(mRAddress)
161                 }
162         }()
163
164         go func() {
165                 for {
166                         jh.monitorManagementChannels()
167                 }
168         }()
169 }
170
171 func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
172         log.Debugf("Processing jobs for type: %v", jh.typeId)
173         messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
174         if error != nil {
175                 log.Warn("Error getting data from MR. Cause: ", error)
176                 time.Sleep(time.Minute) // Must wait before trying to call MR again
177         }
178         log.Debug("Received messages: ", string(messagesBody))
179         jh.distributeMessages(messagesBody)
180 }
181
182 func (jh *jobsHandler) distributeMessages(messages []byte) {
183         if len(messages) > 2 {
184                 jh.mu.Lock()
185                 defer jh.mu.Unlock()
186                 for _, job := range jh.jobs {
187                         if len(job.messagesChannel) < cap(job.messagesChannel) {
188                                 job.messagesChannel <- messages
189                         } else {
190                                 jh.emptyMessagesBuffer(job)
191                         }
192                 }
193         }
194 }
195
196 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
197         log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
198 out:
199         for {
200                 select {
201                 case <-job.messagesChannel:
202                 default:
203                         break out
204                 }
205         }
206 }
207
208 func (jh *jobsHandler) monitorManagementChannels() {
209         select {
210         case addedJob := <-jh.addJobCh:
211                 jh.addJob(addedJob)
212         case deletedJob := <-jh.deleteJobCh:
213                 jh.deleteJob(deletedJob)
214         }
215 }
216
217 func (jh *jobsHandler) addJob(addedJob JobInfo) {
218         jh.mu.Lock()
219         log.Debug("Add job: ", addedJob)
220         newJob := newJob(addedJob, jh.distributeClient)
221         go newJob.start()
222         jh.jobs[addedJob.InfoJobIdentity] = newJob
223         jh.mu.Unlock()
224 }
225
226 func (jh *jobsHandler) deleteJob(deletedJob string) {
227         jh.mu.Lock()
228         log.Debug("Delete job: ", deletedJob)
229         j, exist := jh.jobs[deletedJob]
230         if exist {
231                 j.controlChannel <- struct{}{}
232                 delete(jh.jobs, deletedJob)
233         }
234         jh.mu.Unlock()
235 }
236
237 type job struct {
238         jobInfo         JobInfo
239         client          restclient.HTTPClient
240         messagesChannel chan []byte
241         controlChannel  chan struct{}
242 }
243
244 func newJob(j JobInfo, c restclient.HTTPClient) job {
245         return job{
246                 jobInfo:         j,
247                 client:          c,
248                 messagesChannel: make(chan []byte, 10),
249                 controlChannel:  make(chan struct{}),
250         }
251 }
252
253 func (j *job) start() {
254 out:
255         for {
256                 select {
257                 case <-j.controlChannel:
258                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
259                         break out
260                 case msg := <-j.messagesChannel:
261                         j.sendMessagesToConsumer(msg)
262                 }
263         }
264 }
265
266 func (j *job) sendMessagesToConsumer(messages []byte) {
267         log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
268         if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
269                 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
270         }
271         log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
272 }