Fixing Sonar Code Smell for PMConverter
[nonrtric/plt/ranpm.git] / pm-file-converter / components / kafkacollector / kafkacollector.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20 package kafkacollector
21
22 import (
23         "context"
24         "fmt"
25         "main/common/dataTypes"
26         "main/components/miniocollector"
27         "os"
28         "time"
29
30         "github.com/confluentinc/confluent-kafka-go/kafka"
31         jsoniter "github.com/json-iterator/go"
32         log "github.com/sirupsen/logrus"
33         "golang.org/x/oauth2/clientcredentials"
34 )
35
36 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
37 var bootstrapserver = os.Getenv("KAFKA_SERVER")
38 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
39 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
40 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
41
42 // Limiter - valid for all jobs
43 const parallelism_limiter = 100 //For all jobs
44 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
45
46 const typeLabel = " for type: "
47 const fetchTokenErrorMessage = "Cannot fetch token: "
48 const setTokenErrorMessage = "Cannot set token: "
49
50 // This function intentionally has high cognitive complexity // NOSONAR
51 func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
52
53         log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
54
55         topicOk := false
56         var c *kafka.Consumer = nil
57         running := true
58
59         for topicOk == false {
60
61                 select {
62                 case readerCtrl := <-controlCh:
63                         if readerCtrl.Command == "EXIT" {
64                                 log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
65                                 dataCh <- nil //Signal to job handler
66                                 running = false
67                                 return
68                         }
69                 case <-time.After(1 * time.Second):
70                         if !running {
71                                 return
72                         }
73                         if c == nil {
74                                 c = createKafkaConsumer(typeId, gid, cid)
75                                 if c == nil {
76                                         log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
77                                 } else {
78                                         log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
79                                 }
80                         }
81                         if c != nil && topicOk == false {
82                                 err := c.SubscribeTopics([]string{topic}, nil)
83                                 if err != nil {
84                                         log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying --  error details: ", err)
85                                 } else {
86                                         log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
87                                         topicOk = true
88                                 }
89                         }
90                 }
91         }
92         log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
93
94         var eventChan = make(chan int)
95         go func() {
96                 for {
97                         select {
98                         case evt := <-c.Events():
99                                 switch evt.(type) {
100                                 case kafka.OAuthBearerTokenRefresh:
101                                         log.Debug("New consumer token needed: ", evt)
102                                         token, err := FetchToken()
103                                         if err != nil {
104                                                 log.Warning(fetchTokenErrorMessage, err)
105                                                 c.SetOAuthBearerTokenFailure(err.Error())
106                                         } else {
107                                                 setTokenError := c.SetOAuthBearerToken(*token)
108                                                 if setTokenError != nil {
109                                                         log.Warning(setTokenErrorMessage, setTokenError)
110                                                         c.SetOAuthBearerTokenFailure(setTokenError.Error())
111                                                 }
112                                         }
113                                 default:
114                                         log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
115                                 }
116
117                         case msg := <-eventChan:
118                                 if msg == 0 {
119                                         return
120                                 }
121                         case <-time.After(1 * time.Second):
122                                 if !running {
123                                         return
124                                 }
125                         }
126                 }
127         }()
128
129         go func() {
130                 for {
131                         for {
132                                 select {
133                                 case readerCtrl := <-controlCh:
134                                         if readerCtrl.Command == "EXIT" {
135                                                 eventChan <- 0
136                                                 log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
137                                                 dataCh <- nil //Signal to job handler
138                                                 defer c.Close()
139                                                 return
140                                         }
141                                 default:
142
143                                         ev := c.Poll(1000)
144                                         if ev == nil {
145                                                 log.Debug("Topic Reader for type: ", typeId, "  Nothing to consume on topic: ", topic)
146                                                 continue
147                                         }
148                                         switch e := ev.(type) {
149                                         case *kafka.Message:
150                                                 var kmsg dataTypes.KafkaPayload
151                                                 kmsg.Msg = e
152
153                                                 c.Commit()
154
155                                                 dataCh <- &kmsg
156                                                 log.Debug("Reader msg: ", &kmsg)
157                                                 log.Debug("Reader - data_ch ", dataCh)
158                                         case kafka.Error:
159                                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
160
161                                         case kafka.OAuthBearerTokenRefresh:
162                                                 log.Debug("New consumer token needed: ", ev)
163                                                 token, err := FetchToken()
164                                                 if err != nil {
165                                                         log.Warning(fetchTokenErrorMessage, err)
166                                                         c.SetOAuthBearerTokenFailure(err.Error())
167                                                 } else {
168                                                         setTokenError := c.SetOAuthBearerToken(*token)
169                                                         if setTokenError != nil {
170                                                                 log.Warning(setTokenErrorMessage, setTokenError)
171                                                                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
172                                                         }
173                                                 }
174                                         default:
175                                                 fmt.Printf("Ignored %v\n", e)
176                                         }
177                                 }
178                         }
179                 }
180         }()
181 }
182
183 // This function intentionally has high cognitive complexity // NOSONAR
184 func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
185
186         var kafkaProducer *kafka.Producer
187
188         running := true
189         log.Info("Topic writer starting")
190
191         // Wait for kafka producer to become available - and be prepared to exit the writer
192         for kafkaProducer == nil {
193                 select {
194                 case writerCtl := <-controlCh:
195                         if writerCtl.Command == "EXIT" {
196                                 //ignore cmd
197                         }
198                 default:
199                         kafkaProducer = startProducer()
200                         if kafkaProducer == nil {
201                                 log.Debug("Could not start kafka producer - retrying")
202                                 time.Sleep(1 * time.Second)
203                         } else {
204                                 log.Debug("Kafka producer started")
205                         }
206                 }
207         }
208
209         var eventChan = make(chan int)
210         go func() {
211                 for {
212                         select {
213                         case evt := <-kafkaProducer.Events():
214                                 switch evt.(type) {
215                                 case *kafka.Message:
216                                         m := evt.(*kafka.Message)
217
218                                         if m.TopicPartition.Error != nil {
219                                                 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
220                                         } else {
221                                                 log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
222                                         }
223                                 case kafka.Error:
224                                         log.Debug("Dumping topic writer event, error: ", evt)
225                                 case kafka.OAuthBearerTokenRefresh:
226                                         log.Debug("New producer token needed: ", evt)
227                                         token, err := FetchToken()
228                                         if err != nil {
229                                                 log.Warning(fetchTokenErrorMessage, err)
230                                                 kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
231                                         } else {
232                                                 setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
233                                                 if setTokenError != nil {
234                                                         log.Warning(setTokenErrorMessage, setTokenError)
235                                                         kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
236                                                 }
237                                         }
238                                 default:
239                                         log.Debug("Dumping topic writer event, unknown: ", evt)
240                                 }
241
242                         case msg := <-eventChan:
243                                 if msg == 0 {
244                                         return
245                                 }
246                         case <-time.After(1 * time.Second):
247                                 if !running {
248                                         return
249                                 }
250                         }
251                 }
252         }()
253         go func() {
254                 for {
255                         select {
256                         case writerCtl := <-controlCh:
257                                 if writerCtl.Command == "EXIT" {
258                                         // ignore - wait for channel signal
259                                 }
260
261                         case kmsg := <-dataCh:
262                                 if kmsg == nil {
263                                         eventChan <- 0
264                                         log.Info("Topic writer stopped by channel signal - start_topic_writer")
265                                         defer kafkaProducer.Close()
266                                         return
267                                 }
268
269                                 retries := 10
270                                 msgOk := false
271                                 var err error
272                                 for retry := 1; retry <= retries && msgOk == false; retry++ {
273                                         err = kafkaProducer.Produce(&kafka.Message{
274                                                 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
275                                                 Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
276
277                                         if err == nil {
278                                                 msgOk = true
279                                                 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
280                                         } else {
281                                                 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
282                                                 time.Sleep(time.Duration(retry) * time.Second)
283                                         }
284                                 }
285                                 if !msgOk {
286                                         log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
287                                 }
288                         case <-time.After(1000 * time.Millisecond):
289                                 if !running {
290                                         return
291                                 }
292                         }
293                 }
294         }()
295 }
296
297 // This function intentionally has high cognitive complexity // NOSONAR
298 func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
299         var cm kafka.ConfigMap
300         if creds_grant_type == "" {
301                 log.Info("Creating kafka plain text consumer for type: ", typeId)
302                 cm = kafka.ConfigMap{
303                         "bootstrap.servers":  bootstrapserver,
304                         "group.id":           gid,
305                         "client.id":          cid,
306                         "auto.offset.reset":  "latest",
307                         "enable.auto.commit": false,
308                 }
309         } else {
310                 log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
311                 cm = kafka.ConfigMap{
312                         "bootstrap.servers":  bootstrapserver,
313                         "group.id":           gid,
314                         "client.id":          cid,
315                         "auto.offset.reset":  "latest",
316                         "enable.auto.commit": false,
317                         "sasl.mechanism":     "OAUTHBEARER",
318                         "security.protocol":  "SASL_PLAINTEXT",
319                 }
320         }
321         c, err := kafka.NewConsumer(&cm)
322
323         if err != nil {
324                 log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
325                 return nil
326         }
327
328         log.Info("Created kafka consumer for type: ", typeId, " OK")
329         return c
330 }
331
332 // Start kafka producer
333 // NOSONAR
334 func startProducer() *kafka.Producer {
335         log.Info("Creating kafka producer")
336
337         var cm kafka.ConfigMap
338         if creds_grant_type == "" {
339                 log.Info("Creating kafka SASL plain text producer")
340                 cm = kafka.ConfigMap{
341                         "bootstrap.servers": bootstrapserver,
342                 }
343         } else {
344                 log.Info("Creating kafka SASL plain text producer")
345                 cm = kafka.ConfigMap{
346                         "bootstrap.servers": bootstrapserver,
347                         "sasl.mechanism":    "OAUTHBEARER",
348                         "security.protocol": "SASL_PLAINTEXT",
349                 }
350         }
351
352         p, err := kafka.NewProducer(&cm)
353         if err != nil {
354                 log.Error("Cannot create kafka producer,", err)
355                 return nil
356         }
357         return p
358 }
359
360 func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
361
362         log.Info("Type job", type_id, " started")
363         topicList := make(map[string]string)
364         topicList[type_id] = "json-file-ready-kp"
365         topicList["PmData"] = "json-file-ready-kpadp"
366         running := true
367         for {
368                 select {
369                 case jobCtl := <-control_ch:
370                         log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
371                         switch jobCtl.Command {
372                         case "EXIT":
373                                 //ignore cmd - handled by channel signal
374                         }
375
376                 case msg := <-data_in_ch:
377                         if msg == nil {
378                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
379
380                                 running = false
381                                 return
382                         }
383                         jobLimiterChan <- struct{}{}
384                         go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
385
386                 case <-time.After(1 * time.Second):
387                         if !running {
388                                 return
389                         }
390                 }
391         }
392 }
393
394 // This function intentionally has more parameters for legacy compatibility // NOSONAR
395 func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
396         defer func() {
397                 <-jobLimiterChan
398         }()
399         start := time.Now()
400         var evtData dataTypes.XmlFileEventHeader
401
402         err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
403         if err != nil {
404                 log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
405                 return
406         }
407         log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
408
409         start = time.Now()
410         newFn := miniocollector.XmlToJsonConv(&evtData)
411         if err != nil {
412                 log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
413                 return
414         }
415         log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
416
417         var fde dataTypes.FileDownloadedEvt
418         fde.Filename = newFn
419         j, err := jsoniter.Marshal(fde)
420
421         if err != nil {
422                 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
423                 return
424         }
425         msg.Msg.Value = j
426
427         msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
428         log.Debug("Marshal file-collect event ", time.Since(start).String())
429         log.Debug("Sending file-collect event to output topic(s)", len(topicList))
430         for _, v := range topicList {
431                 fmt.Println("Output Topic: " + v)
432                 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
433                 kmsg.Msg = msg.Msg
434                 kmsg.Topic = v
435                 dataOutChannel <- kmsg
436         }
437 }
438
439 // NOSONAR
440 func FetchToken() (*kafka.OAuthBearerToken, error) {
441         log.Debug("Get token inline")
442         conf := &clientcredentials.Config{
443                 ClientID:     creds_client_id,
444                 ClientSecret: creds_client_secret,
445                 TokenURL:     creds_service_url,
446         }
447         token, err := conf.Token(context.Background())
448         if err != nil {
449                 log.Warning("Cannot fetch access token: ", err)
450                 return nil, err
451         }
452         extensions := map[string]string{}
453
454         log.Debug("=====================================================")
455         log.Debug("token: ", token)
456         log.Debug("=====================================================")
457         log.Debug("TokenValue: ", token.AccessToken)
458         log.Debug("=====================================================")
459         log.Debug("Expiration: ", token.Expiry)
460         t := token.Expiry
461         oauthBearerToken := kafka.OAuthBearerToken{
462                 TokenValue: token.AccessToken,
463                 Expiration: t,
464                 Extensions: extensions,
465         }
466
467         return &oauthBearerToken, nil
468 }