Update Dmaap topic
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "fmt"
25         "sync"
26         "time"
27
28         "github.com/confluentinc/confluent-kafka-go/kafka"
29         log "github.com/sirupsen/logrus"
30         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31         "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
32         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
33 )
34
35 type TypeData struct {
36         Identity    string `json:"id"`
37         jobsHandler *jobsHandler
38 }
39
40 type JobInfo struct {
41         Owner            string     `json:"owner"`
42         LastUpdated      string     `json:"last_updated"`
43         InfoJobIdentity  string     `json:"info_job_identity"`
44         TargetUri        string     `json:"target_uri"`
45         InfoJobData      Parameters `json:"info_job_data"`
46         InfoTypeIdentity string     `json:"info_type_identity"`
47 }
48
49 type JobTypesManager interface {
50         LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
51         GetSupportedTypes() []string
52 }
53
54 type JobsManager interface {
55         AddJobFromRESTCall(JobInfo) error
56         DeleteJobFromRESTCall(jobId string)
57 }
58
59 type JobsManagerImpl struct {
60         allTypes         map[string]TypeData
61         pollClient       restclient.HTTPClient
62         mrAddress        string
63         kafkaFactory     kafkaclient.KafkaFactory
64         distributeClient restclient.HTTPClient
65 }
66
67 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
68         return &JobsManagerImpl{
69                 allTypes:         make(map[string]TypeData),
70                 pollClient:       pollClient,
71                 mrAddress:        mrAddr,
72                 kafkaFactory:     kafkaFactory,
73                 distributeClient: distributeClient,
74         }
75 }
76
77 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
78         if err := jm.validateJobInfo(ji); err == nil {
79                 typeData := jm.allTypes[ji.InfoTypeIdentity]
80                 typeData.jobsHandler.addJobCh <- ji
81                 log.Debug("Added job: ", ji)
82                 return nil
83         } else {
84                 return err
85         }
86 }
87
88 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
89         for _, typeData := range jm.allTypes {
90                 log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
91                 typeData.jobsHandler.deleteJobCh <- jobId
92         }
93         log.Debug("Deleted job: ", jobId)
94 }
95
96 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
97         if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
98                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
99         }
100         if ji.InfoJobIdentity == "" {
101                 return fmt.Errorf("missing required job identity: %v", ji)
102         }
103         // Temporary for when there are only REST callbacks needed
104         if ji.TargetUri == "" {
105                 return fmt.Errorf("missing required target URI: %v", ji)
106         }
107         return nil
108 }
109
110 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
111         for _, typeDef := range types {
112                 if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
113                         log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
114                 }
115                 jm.allTypes[typeDef.Identity] = TypeData{
116                         Identity:    typeDef.Identity,
117                         jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
118                 }
119         }
120         return types
121 }
122
123 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
124         supportedTypes := []string{}
125         for k := range jm.allTypes {
126                 supportedTypes = append(supportedTypes, k)
127         }
128         return supportedTypes
129 }
130
131 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
132         for _, jobType := range jm.allTypes {
133
134                 go jobType.jobsHandler.startPollingAndDistribution()
135
136         }
137 }
138
139 type jobsHandler struct {
140         mu               sync.Mutex
141         typeId           string
142         pollingAgent     pollingAgent
143         jobs             map[string]job
144         addJobCh         chan JobInfo
145         deleteJobCh      chan string
146         distributeClient restclient.HTTPClient
147 }
148
149 func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
150         pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
151         return &jobsHandler{
152                 typeId:           typeDef.Identity,
153                 pollingAgent:     pollingAgent,
154                 jobs:             make(map[string]job),
155                 addJobCh:         make(chan JobInfo),
156                 deleteJobCh:      make(chan string),
157                 distributeClient: distributeClient,
158         }
159 }
160
161 func (jh *jobsHandler) startPollingAndDistribution() {
162         go func() {
163                 for {
164                         jh.pollAndDistributeMessages()
165                 }
166         }()
167
168         go func() {
169                 for {
170                         jh.monitorManagementChannels()
171                 }
172         }()
173 }
174
175 func (jh *jobsHandler) pollAndDistributeMessages() {
176         log.Debugf("Processing jobs for type: %v", jh.typeId)
177         messagesBody, error := jh.pollingAgent.pollMessages()
178         if error != nil {
179                 log.Warn("Error getting data from source. Cause: ", error)
180                 time.Sleep(time.Minute) // Must wait before trying to call data source again
181                 return
182         }
183         jh.distributeMessages(messagesBody)
184 }
185
186 func (jh *jobsHandler) distributeMessages(messages []byte) {
187         if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
188                 log.Debug("Distributing messages: ", string(messages))
189                 jh.mu.Lock()
190                 defer jh.mu.Unlock()
191                 for _, job := range jh.jobs {
192                         if len(job.messagesChannel) < cap(job.messagesChannel) {
193                                 job.messagesChannel <- messages
194                         } else {
195                                 jh.emptyMessagesBuffer(job)
196                         }
197                 }
198         }
199 }
200
201 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
202         log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
203 out:
204         for {
205                 select {
206                 case <-job.messagesChannel:
207                 default:
208                         break out
209                 }
210         }
211 }
212
213 func (jh *jobsHandler) monitorManagementChannels() {
214         select {
215         case addedJob := <-jh.addJobCh:
216                 jh.addJob(addedJob)
217         case deletedJob := <-jh.deleteJobCh:
218                 jh.deleteJob(deletedJob)
219         }
220 }
221
222 func (jh *jobsHandler) addJob(addedJob JobInfo) {
223         jh.mu.Lock()
224         log.Debug("Add job: ", addedJob)
225         newJob := newJob(addedJob, jh.distributeClient)
226         go newJob.start()
227         jh.jobs[addedJob.InfoJobIdentity] = newJob
228         jh.mu.Unlock()
229 }
230
231 func (jh *jobsHandler) deleteJob(deletedJob string) {
232         jh.mu.Lock()
233         log.Debug("Delete job: ", deletedJob)
234         j, exist := jh.jobs[deletedJob]
235         if exist {
236                 j.controlChannel <- struct{}{}
237                 delete(jh.jobs, deletedJob)
238         }
239         jh.mu.Unlock()
240 }
241
242 type pollingAgent interface {
243         pollMessages() ([]byte, error)
244 }
245
246 func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
247         if typeDef.DMaaPTopicURL != "" {
248                 return dMaaPPollingAgent{
249                         messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
250                         pollClient:       pollClient,
251                 }
252         } else {
253                 return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
254         }
255 }
256
257 type dMaaPPollingAgent struct {
258         messageRouterURL string
259         pollClient       restclient.HTTPClient
260 }
261
262 func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
263         return restclient.Get(pa.messageRouterURL, pa.pollClient)
264 }
265
266 type kafkaPollingAgent struct {
267         kafkaClient kafkaclient.KafkaClient
268 }
269
270 func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
271         c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
272         if err != nil {
273                 log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
274         }
275         return kafkaPollingAgent{
276                 kafkaClient: c,
277         }
278 }
279
280 func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
281         msg, err := pa.kafkaClient.ReadMessage()
282         if err == nil {
283                 return msg, nil
284         } else {
285                 if isKafkaTimedOutError(err) {
286                         return []byte(""), nil
287                 }
288                 return nil, err
289         }
290 }
291
292 func isKafkaTimedOutError(err error) bool {
293         kafkaErr, ok := err.(kafka.Error)
294         return ok && kafkaErr.Code() == kafka.ErrTimedOut
295 }
296
297 type job struct {
298         jobInfo         JobInfo
299         client          restclient.HTTPClient
300         messagesChannel chan []byte
301         controlChannel  chan struct{}
302 }
303
304 func newJob(j JobInfo, c restclient.HTTPClient) job {
305
306         return job{
307                 jobInfo:         j,
308                 client:          c,
309                 messagesChannel: make(chan []byte, 10),
310                 controlChannel:  make(chan struct{}),
311         }
312 }
313
314 type Parameters struct {
315         BufferTimeout BufferTimeout `json:"bufferTimeout"`
316 }
317
318 type BufferTimeout struct {
319         MaxSize            int   `json:"maxSize"`
320         MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
321 }
322
323 func (j *job) start() {
324         if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
325                 j.startReadingSingleMessages()
326         } else {
327                 j.startReadingMessagesBuffered()
328         }
329 }
330
331 func (j *job) startReadingSingleMessages() {
332 out:
333         for {
334                 select {
335                 case <-j.controlChannel:
336                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
337                         break out
338                 case msg := <-j.messagesChannel:
339                         j.sendMessagesToConsumer(msg)
340                 }
341         }
342 }
343
344 func (j *job) startReadingMessagesBuffered() {
345 out:
346         for {
347                 select {
348                 case <-j.controlChannel:
349                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
350                         break out
351                 default:
352                         msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
353                         if len(msgs) > 0 {
354                                 j.sendMessagesToConsumer(msgs)
355                         }
356                 }
357         }
358 }
359
360 func (j *job) read(bufferParams BufferTimeout) []byte {
361         wg := sync.WaitGroup{}
362         wg.Add(bufferParams.MaxSize)
363         var msgs []byte
364         c := make(chan struct{})
365         go func() {
366                 i := 0
367         out:
368                 for {
369                         select {
370                         case <-c:
371                                 break out
372                         case msg := <-j.messagesChannel:
373                                 i++
374                                 msgs = append(msgs, msg...)
375                                 wg.Done()
376                                 if i == bufferParams.MaxSize {
377                                         break out
378                                 }
379                         }
380                 }
381         }()
382         j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
383         close(c)
384         return msgs
385 }
386
387 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
388         c := make(chan struct{})
389         go func() {
390                 defer close(c)
391                 wg.Wait()
392         }()
393         select {
394         case <-c:
395                 return false // completed normally
396         case <-time.After(timeout):
397                 return true // timed out
398         }
399 }
400
401 func (j *job) sendMessagesToConsumer(messages []byte) {
402         log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
403         if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
404                 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
405                 return
406         }
407         log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
408 }