86bfe05c334963d13fda4744aacd59d7923dbefe
[nonrtric.git] / dmaap-mediator-producer / internal / jobs / jobs.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package jobs
22
23 import (
24         "fmt"
25         "strings"
26         "sync"
27         "time"
28
29         "github.com/confluentinc/confluent-kafka-go/kafka"
30         log "github.com/sirupsen/logrus"
31         "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
32         "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
33         "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
34 )
35
36 type TypeData struct {
37         Identity    string `json:"id"`
38         jobsHandler *jobsHandler
39 }
40
41 type sourceType string
42
43 const dMaaPSource = sourceType("dmaap")
44 const kafkaSource = sourceType("kafka")
45
46 type JobInfo struct {
47         Owner            string     `json:"owner"`
48         LastUpdated      string     `json:"last_updated"`
49         InfoJobIdentity  string     `json:"info_job_identity"`
50         TargetUri        string     `json:"target_uri"`
51         InfoJobData      Parameters `json:"info_job_data"`
52         InfoTypeIdentity string     `json:"info_type_identity"`
53         sourceType       sourceType
54 } // @name JobInfo
55
56 type JobTypesManager interface {
57         LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
58         GetSupportedTypes() []string
59 }
60
61 type JobsManager interface {
62         AddJobFromRESTCall(JobInfo) error
63         DeleteJobFromRESTCall(jobId string)
64 }
65
66 type JobsManagerImpl struct {
67         allTypes         map[string]TypeData
68         pollClient       restclient.HTTPClient
69         mrAddress        string
70         kafkaFactory     kafkaclient.KafkaFactory
71         distributeClient restclient.HTTPClient
72 }
73
74 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
75         return &JobsManagerImpl{
76                 allTypes:         make(map[string]TypeData),
77                 pollClient:       pollClient,
78                 mrAddress:        mrAddr,
79                 kafkaFactory:     kafkaFactory,
80                 distributeClient: distributeClient,
81         }
82 }
83
84 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
85         if err := jm.validateJobInfo(ji); err == nil {
86                 typeData := jm.allTypes[ji.InfoTypeIdentity]
87                 ji.sourceType = typeData.jobsHandler.sourceType
88                 typeData.jobsHandler.addJobCh <- ji
89                 log.Debug("Added job: ", ji)
90                 return nil
91         } else {
92                 return err
93         }
94 }
95
96 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
97         for _, typeData := range jm.allTypes {
98                 log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
99                 typeData.jobsHandler.deleteJobCh <- jobId
100         }
101         log.Debug("Deleted job: ", jobId)
102 }
103
104 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
105         if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
106                 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
107         }
108         if ji.InfoJobIdentity == "" {
109                 return fmt.Errorf("missing required job identity: %v", ji)
110         }
111         // Temporary for when there are only REST callbacks needed
112         if ji.TargetUri == "" {
113                 return fmt.Errorf("missing required target URI: %v", ji)
114         }
115         return nil
116 }
117
118 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
119         for _, typeDef := range types {
120                 if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
121                         log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
122                 }
123                 jm.allTypes[typeDef.Identity] = TypeData{
124                         Identity:    typeDef.Identity,
125                         jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
126                 }
127         }
128         return types
129 }
130
131 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
132         supportedTypes := []string{}
133         for k := range jm.allTypes {
134                 supportedTypes = append(supportedTypes, k)
135         }
136         return supportedTypes
137 }
138
139 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
140         for _, jobType := range jm.allTypes {
141
142                 go jobType.jobsHandler.startPollingAndDistribution()
143
144         }
145 }
146
147 type jobsHandler struct {
148         mu               sync.Mutex
149         typeId           string
150         sourceType       sourceType
151         pollingAgent     pollingAgent
152         jobs             map[string]job
153         addJobCh         chan JobInfo
154         deleteJobCh      chan string
155         distributeClient restclient.HTTPClient
156 }
157
158 func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
159         pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
160         sourceType := kafkaSource
161         if typeDef.DMaaPTopicURL != "" {
162                 sourceType = dMaaPSource
163         }
164         return &jobsHandler{
165                 typeId:           typeDef.Identity,
166                 sourceType:       sourceType,
167                 pollingAgent:     pollingAgent,
168                 jobs:             make(map[string]job),
169                 addJobCh:         make(chan JobInfo),
170                 deleteJobCh:      make(chan string),
171                 distributeClient: distributeClient,
172         }
173 }
174
175 func (jh *jobsHandler) startPollingAndDistribution() {
176         go func() {
177                 for {
178                         jh.pollAndDistributeMessages()
179                 }
180         }()
181
182         go func() {
183                 for {
184                         jh.monitorManagementChannels()
185                 }
186         }()
187 }
188
189 func (jh *jobsHandler) pollAndDistributeMessages() {
190         log.Debugf("Processing jobs for type: %v", jh.typeId)
191         messagesBody, error := jh.pollingAgent.pollMessages()
192         if error != nil {
193                 log.Warn("Error getting data from source. Cause: ", error)
194                 time.Sleep(time.Minute) // Must wait before trying to call data source again
195                 return
196         }
197         jh.distributeMessages(messagesBody)
198 }
199
200 func (jh *jobsHandler) distributeMessages(messages []byte) {
201         if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
202                 log.Debug("Distributing messages: ", string(messages))
203                 jh.mu.Lock()
204                 defer jh.mu.Unlock()
205                 for _, job := range jh.jobs {
206                         if len(job.messagesChannel) < cap(job.messagesChannel) {
207                                 job.messagesChannel <- messages
208                         } else {
209                                 jh.emptyMessagesBuffer(job)
210                         }
211                 }
212         }
213 }
214
215 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
216         log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
217 out:
218         for {
219                 select {
220                 case <-job.messagesChannel:
221                 default:
222                         break out
223                 }
224         }
225 }
226
227 func (jh *jobsHandler) monitorManagementChannels() {
228         select {
229         case addedJob := <-jh.addJobCh:
230                 jh.addJob(addedJob)
231         case deletedJob := <-jh.deleteJobCh:
232                 jh.deleteJob(deletedJob)
233         }
234 }
235
236 func (jh *jobsHandler) addJob(addedJob JobInfo) {
237         jh.mu.Lock()
238         log.Debug("Add job: ", addedJob)
239         newJob := newJob(addedJob, jh.distributeClient)
240         go newJob.start()
241         jh.jobs[addedJob.InfoJobIdentity] = newJob
242         jh.mu.Unlock()
243 }
244
245 func (jh *jobsHandler) deleteJob(deletedJob string) {
246         jh.mu.Lock()
247         log.Debug("Delete job: ", deletedJob)
248         j, exist := jh.jobs[deletedJob]
249         if exist {
250                 j.controlChannel <- struct{}{}
251                 delete(jh.jobs, deletedJob)
252         }
253         jh.mu.Unlock()
254 }
255
256 type pollingAgent interface {
257         pollMessages() ([]byte, error)
258 }
259
260 func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
261         if typeDef.DMaaPTopicURL != "" {
262                 return dMaaPPollingAgent{
263                         messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
264                         pollClient:       pollClient,
265                 }
266         } else {
267                 return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
268         }
269 }
270
271 type dMaaPPollingAgent struct {
272         messageRouterURL string
273         pollClient       restclient.HTTPClient
274 }
275
276 func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
277         return restclient.Get(pa.messageRouterURL, pa.pollClient)
278 }
279
280 type kafkaPollingAgent struct {
281         kafkaClient kafkaclient.KafkaClient
282 }
283
284 func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
285         c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
286         if err != nil {
287                 log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
288         }
289         return kafkaPollingAgent{
290                 kafkaClient: c,
291         }
292 }
293
294 func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
295         msg, err := pa.kafkaClient.ReadMessage()
296         if err == nil {
297                 return msg, nil
298         } else {
299                 if isKafkaTimedOutError(err) {
300                         return []byte(""), nil
301                 }
302                 return nil, err
303         }
304 }
305
306 func isKafkaTimedOutError(err error) bool {
307         kafkaErr, ok := err.(kafka.Error)
308         return ok && kafkaErr.Code() == kafka.ErrTimedOut
309 }
310
311 type job struct {
312         jobInfo         JobInfo
313         client          restclient.HTTPClient
314         messagesChannel chan []byte
315         controlChannel  chan struct{}
316 }
317
318 func newJob(j JobInfo, c restclient.HTTPClient) job {
319
320         return job{
321                 jobInfo:         j,
322                 client:          c,
323                 messagesChannel: make(chan []byte, 10),
324                 controlChannel:  make(chan struct{}),
325         }
326 }
327
328 type Parameters struct {
329         BufferTimeout BufferTimeout `json:"bufferTimeout"`
330 } // @name Parameters
331
332 type BufferTimeout struct {
333         MaxSize            int   `json:"maxSize"`
334         MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
335 } // @name BufferTimeout
336
337 func (j *job) start() {
338         if j.isJobBuffered() {
339                 j.startReadingMessagesBuffered()
340         } else {
341                 j.startReadingSingleMessages()
342         }
343 }
344
345 func (j *job) startReadingSingleMessages() {
346 out:
347         for {
348                 select {
349                 case <-j.controlChannel:
350                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
351                         break out
352                 case msg := <-j.messagesChannel:
353                         j.sendMessagesToConsumer(msg)
354                 }
355         }
356 }
357
358 func (j *job) startReadingMessagesBuffered() {
359 out:
360         for {
361                 select {
362                 case <-j.controlChannel:
363                         log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
364                         break out
365                 default:
366                         msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
367                         if len(msgs) > 0 {
368                                 j.sendMessagesToConsumer(msgs)
369                         }
370                 }
371         }
372 }
373
374 func (j *job) read(bufferParams BufferTimeout) []byte {
375         wg := sync.WaitGroup{}
376         wg.Add(bufferParams.MaxSize)
377         rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
378         c := make(chan struct{})
379         go func() {
380                 i := 0
381         out:
382                 for {
383                         select {
384                         case <-c:
385                                 break out
386                         case msg := <-j.messagesChannel:
387                                 rawMsgs = append(rawMsgs, msg)
388                                 i++
389                                 wg.Done()
390                                 if i == bufferParams.MaxSize {
391                                         break out
392                                 }
393                         }
394                 }
395         }()
396         j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
397         close(c)
398         return getAsJSONArray(rawMsgs)
399 }
400
401 func getAsJSONArray(rawMsgs [][]byte) []byte {
402         if len(rawMsgs) == 0 {
403                 return []byte("")
404         }
405         strings := ""
406         for i := 0; i < len(rawMsgs); i++ {
407                 strings = strings + makeIntoString(rawMsgs[i])
408                 strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
409         }
410         return []byte(wrapInJSONArray(strings))
411 }
412
413 func makeIntoString(rawMsg []byte) string {
414         return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
415 }
416
417 func addSeparatorIfNeeded(strings string, position, length int) string {
418         if position < length-1 {
419                 strings = strings + ","
420         }
421         return strings
422 }
423
424 func wrapInJSONArray(strings string) string {
425         return "[" + strings + "]"
426 }
427
428 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
429         c := make(chan struct{})
430         go func() {
431                 defer close(c)
432                 wg.Wait()
433         }()
434         select {
435         case <-c:
436                 return false // completed normally
437         case <-time.After(timeout):
438                 return true // timed out
439         }
440 }
441
442 func (j *job) sendMessagesToConsumer(messages []byte) {
443         log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
444         contentType := restclient.ContentTypeJSON
445         if j.isJobKafka() && !j.isJobBuffered() {
446                 contentType = restclient.ContentTypePlain
447         }
448         if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
449                 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
450                 return
451         }
452         log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
453 }
454
455 func (j *job) isJobBuffered() bool {
456         return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
457 }
458
459 func (j *job) isJobKafka() bool {
460         return j.jobInfo.sourceType == kafkaSource
461 }