418b312729b5507712d9c6746026a18a4d3b7141
[nonrtric/plt/ranpm.git] / pm-file-converter / components / kafkacollector / kafkacollector.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20
21 //nolint:gocognit
22 package kafkacollector
23
24 import (
25         "context"
26         "fmt"
27         "main/common/dataTypes"
28         "main/components/miniocollector"
29         "os"
30         "time"
31
32         "github.com/confluentinc/confluent-kafka-go/kafka"
33         jsoniter "github.com/json-iterator/go"
34         log "github.com/sirupsen/logrus"
35         "golang.org/x/oauth2/clientcredentials"
36 )
37
38 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
39 var bootstrapserver = os.Getenv("KAFKA_SERVER")
40 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
41 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
42 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
43
44 // Limiter - valid for all jobs
45 const parallelism_limiter = 100 //For all jobs
46 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
47
48 const typeLabel = " for type: "
49 const fetchTokenErrorMessage = "Cannot fetch token: "
50 const setTokenErrorMessage = "Cannot set token: "
51
52 // This function intentionally has high cognitive complexity //NOSONAR
53 func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
54
55         log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
56
57         topicOk := false
58         var c *kafka.Consumer = nil
59         running := true
60
61         for topicOk == false {
62
63                 select {
64                 case readerCtrl := <-controlCh:
65                         if readerCtrl.Command == "EXIT" {
66                                 log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
67                                 dataCh <- nil //Signal to job handler
68                                 running = false
69                                 return
70                         }
71                 case <-time.After(1 * time.Second):
72                         if !running {
73                                 return
74                         }
75                         if c == nil {
76                                 c = createKafkaConsumer(typeId, gid, cid)
77                                 if c == nil {
78                                         log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
79                                 } else {
80                                         log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
81                                 }
82                         }
83                         if c != nil && topicOk == false {
84                                 err := c.SubscribeTopics([]string{topic}, nil)
85                                 if err != nil {
86                                         log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying --  error details: ", err)
87                                 } else {
88                                         log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
89                                         topicOk = true
90                                 }
91                         }
92                 }
93         }
94         log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
95
96         var eventChan = make(chan int)
97         go func() {
98                 for {
99                         select {
100                         case evt := <-c.Events():
101                                 switch evt.(type) {
102                                 case kafka.OAuthBearerTokenRefresh:
103                                         log.Debug("New consumer token needed: ", evt)
104                                         token, err := FetchToken()
105                                         if err != nil {
106                                                 log.Warning(fetchTokenErrorMessage, err)
107                                                 c.SetOAuthBearerTokenFailure(err.Error())
108                                         } else {
109                                                 setTokenError := c.SetOAuthBearerToken(*token)
110                                                 if setTokenError != nil {
111                                                         log.Warning(setTokenErrorMessage, setTokenError)
112                                                         c.SetOAuthBearerTokenFailure(setTokenError.Error())
113                                                 }
114                                         }
115                                 default:
116                                         log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
117                                 }
118
119                         case msg := <-eventChan:
120                                 if msg == 0 {
121                                         return
122                                 }
123                         case <-time.After(1 * time.Second):
124                                 if !running {
125                                         return
126                                 }
127                         }
128                 }
129         }()
130
131         go func() {
132                 for {
133                         for {
134                                 select {
135                                 case readerCtrl := <-controlCh:
136                                         if readerCtrl.Command == "EXIT" {
137                                                 eventChan <- 0
138                                                 log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
139                                                 dataCh <- nil //Signal to job handler
140                                                 defer c.Close()
141                                                 return
142                                         }
143                                 default:
144
145                                         ev := c.Poll(1000)
146                                         if ev == nil {
147                                                 log.Debug("Topic Reader for type: ", typeId, "  Nothing to consume on topic: ", topic)
148                                                 continue
149                                         }
150                                         switch e := ev.(type) {
151                                         case *kafka.Message:
152                                                 var kmsg dataTypes.KafkaPayload
153                                                 kmsg.Msg = e
154
155                                                 c.Commit()
156
157                                                 dataCh <- &kmsg
158                                                 log.Debug("Reader msg: ", &kmsg)
159                                                 log.Debug("Reader - data_ch ", dataCh)
160                                         case kafka.Error:
161                                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
162
163                                         case kafka.OAuthBearerTokenRefresh:
164                                                 log.Debug("New consumer token needed: ", ev)
165                                                 token, err := FetchToken()
166                                                 if err != nil {
167                                                         log.Warning(fetchTokenErrorMessage, err)
168                                                         c.SetOAuthBearerTokenFailure(err.Error())
169                                                 } else {
170                                                         setTokenError := c.SetOAuthBearerToken(*token)
171                                                         if setTokenError != nil {
172                                                                 log.Warning(setTokenErrorMessage, setTokenError)
173                                                                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
174                                                         }
175                                                 }
176                                         default:
177                                                 fmt.Printf("Ignored %v\n", e)
178                                         }
179                                 }
180                         }
181                 }
182         }()
183 }
184
185 // This function intentionally has high cognitive complexity //NOSONAR
186 func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
187
188         var kafkaProducer *kafka.Producer
189
190         running := true
191         log.Info("Topic writer starting")
192
193         // Wait for kafka producer to become available - and be prepared to exit the writer
194         for kafkaProducer == nil {
195                 select {
196                 case writerCtl := <-controlCh:
197                         if writerCtl.Command == "EXIT" {
198                                 //ignore cmd
199                         }
200                 default:
201                         kafkaProducer = startProducer()
202                         if kafkaProducer == nil {
203                                 log.Debug("Could not start kafka producer - retrying")
204                                 time.Sleep(1 * time.Second)
205                         } else {
206                                 log.Debug("Kafka producer started")
207                         }
208                 }
209         }
210
211         var eventChan = make(chan int)
212         go func() {
213                 for {
214                         select {
215                         case evt := <-kafkaProducer.Events():
216                                 switch evt.(type) {
217                                 case *kafka.Message:
218                                         m := evt.(*kafka.Message)
219
220                                         if m.TopicPartition.Error != nil {
221                                                 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
222                                         } else {
223                                                 log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
224                                         }
225                                 case kafka.Error:
226                                         log.Debug("Dumping topic writer event, error: ", evt)
227                                 case kafka.OAuthBearerTokenRefresh:
228                                         log.Debug("New producer token needed: ", evt)
229                                         token, err := FetchToken()
230                                         if err != nil {
231                                                 log.Warning(fetchTokenErrorMessage, err)
232                                                 kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
233                                         } else {
234                                                 setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
235                                                 if setTokenError != nil {
236                                                         log.Warning(setTokenErrorMessage, setTokenError)
237                                                         kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
238                                                 }
239                                         }
240                                 default:
241                                         log.Debug("Dumping topic writer event, unknown: ", evt)
242                                 }
243
244                         case msg := <-eventChan:
245                                 if msg == 0 {
246                                         return
247                                 }
248                         case <-time.After(1 * time.Second):
249                                 if !running {
250                                         return
251                                 }
252                         }
253                 }
254         }()
255         go func() {
256                 for {
257                         select {
258                         case writerCtl := <-controlCh:
259                                 if writerCtl.Command == "EXIT" {
260                                         // ignore - wait for channel signal
261                                 }
262
263                         case kmsg := <-dataCh:
264                                 if kmsg == nil {
265                                         eventChan <- 0
266                                         log.Info("Topic writer stopped by channel signal - start_topic_writer")
267                                         defer kafkaProducer.Close()
268                                         return
269                                 }
270
271                                 retries := 10
272                                 msgOk := false
273                                 var err error
274                                 for retry := 1; retry <= retries && msgOk == false; retry++ {
275                                         err = kafkaProducer.Produce(&kafka.Message{
276                                                 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
277                                                 Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
278
279                                         if err == nil {
280                                                 msgOk = true
281                                                 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
282                                         } else {
283                                                 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
284                                                 time.Sleep(time.Duration(retry) * time.Second)
285                                         }
286                                 }
287                                 if !msgOk {
288                                         log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
289                                 }
290                         case <-time.After(1000 * time.Millisecond):
291                                 if !running {
292                                         return
293                                 }
294                         }
295                 }
296         }()
297 }
298
299 // We need to pass the kafka properties in this way for readability purpose //NOSONAR
300 func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
301         var cm kafka.ConfigMap
302         if creds_grant_type == "" {
303                 log.Info("Creating kafka plain text consumer for type: ", typeId)
304                 cm = kafka.ConfigMap{
305                         "bootstrap.servers":  bootstrapserver,
306                         "group.id":           gid,
307                         "client.id":          cid,
308                         "auto.offset.reset":  "latest",
309                         "enable.auto.commit": false,
310                 }
311         } else {
312                 log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
313                 cm = kafka.ConfigMap{
314                         "bootstrap.servers":  bootstrapserver,
315                         "group.id":           gid,
316                         "client.id":          cid,
317                         "auto.offset.reset":  "latest",
318                         "enable.auto.commit": false,
319                         "sasl.mechanism":     "OAUTHBEARER",
320                         "security.protocol":  "SASL_PLAINTEXT",
321                 }
322         }
323         c, err := kafka.NewConsumer(&cm)
324
325         if err != nil {
326                 log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
327                 return nil
328         }
329
330         log.Info("Created kafka consumer for type: ", typeId, " OK")
331         return c
332 }
333
334 // We need to pass the kafka properties in this way for readability purpose //NOSONAR
335 func startProducer() *kafka.Producer {
336         log.Info("Creating kafka producer")
337
338         var cm kafka.ConfigMap
339         if creds_grant_type == "" {
340                 log.Info("Creating kafka SASL plain text producer")
341                 cm = kafka.ConfigMap{
342                         "bootstrap.servers": bootstrapserver,
343                 }
344         } else {
345                 log.Info("Creating kafka SASL plain text producer")
346                 cm = kafka.ConfigMap{
347                         "bootstrap.servers": bootstrapserver,
348                         "sasl.mechanism":    "OAUTHBEARER",
349                         "security.protocol": "SASL_PLAINTEXT",
350                 }
351         }
352
353         p, err := kafka.NewProducer(&cm)
354         if err != nil {
355                 log.Error("Cannot create kafka producer,", err)
356                 return nil
357         }
358         return p
359 }
360
361 func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
362
363         log.Info("Type job", typeId, " started")
364         topicList := make(map[string]string)
365         topicList[typeId] = "json-file-ready-kp"
366         topicList["PmData"] = "json-file-ready-kpadp"
367         running := true
368         for {
369                 select {
370                 case jobCtl := <-controlCh:
371                         log.Debug("Type job ", typeId, " new cmd received ", jobCtl.Command)
372                         switch jobCtl.Command {
373                         case "EXIT":
374                                 //ignore cmd - handled by channel signal
375                         }
376
377                 case msg := <-dataInCh:
378                         if msg == nil {
379                                 log.Info("Type job ", typeId, " stopped by channel signal -  start_job_xml_file_data")
380
381                                 running = false
382                                 return
383                         }
384                         jobLimiterChan <- struct{}{}
385                         go runXmlJob(typeId, msg, "gz", dataOutChannel, topicList, jobLimiterChan, fvolume, fsbucket)
386
387                 case <-time.After(1 * time.Second):
388                         if !running {
389                                 return
390                         }
391                 }
392         }
393 }
394
395 // This function intentionally has more parameters for legacy compatibility //NOSONAR
396 func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
397         defer func() {
398                 <-jobLimiterChan
399         }()
400         start := time.Now()
401         var evtData dataTypes.XmlFileEventHeader
402
403         err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
404         if err != nil {
405                 log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
406                 return
407         }
408         log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
409
410         start = time.Now()
411         newFn := miniocollector.XmlToJsonConv(&evtData)
412         if err != nil {
413                 log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
414                 return
415         }
416         log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
417
418         var fde dataTypes.FileDownloadedEvt
419         fde.Filename = newFn
420         j, err := jsoniter.Marshal(fde)
421
422         if err != nil {
423                 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
424                 return
425         }
426         msg.Msg.Value = j
427
428         msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
429         log.Debug("Marshal file-collect event ", time.Since(start).String())
430         log.Debug("Sending file-collect event to output topic(s)", len(topicList))
431         for _, v := range topicList {
432                 fmt.Println("Output Topic: " + v)
433                 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
434                 kmsg.Msg = msg.Msg
435                 kmsg.Topic = v
436                 dataOutChannel <- kmsg
437         }
438 }
439
440 // This function intentionally has some patterns in logs for easier identification //NOSONAR
441 func FetchToken() (*kafka.OAuthBearerToken, error) {
442         log.Debug("Get token inline")
443         conf := &clientcredentials.Config{
444                 ClientID:     creds_client_id,
445                 ClientSecret: creds_client_secret,
446                 TokenURL:     creds_service_url,
447         }
448         token, err := conf.Token(context.Background())
449         if err != nil {
450                 log.Warning("Cannot fetch access token: ", err)
451                 return nil, err
452         }
453         extensions := map[string]string{}
454
455         log.Debug("=====================================================")
456         log.Debug("token: ", token)
457         log.Debug("=====================================================")
458         log.Debug("TokenValue: ", token.AccessToken)
459         log.Debug("=====================================================")
460         log.Debug("Expiration: ", token.Expiry)
461         t := token.Expiry
462         oauthBearerToken := kafka.OAuthBearerToken{
463                 TokenValue: token.AccessToken,
464                 Expiration: t,
465                 Extensions: extensions,
466         }
467
468         return &oauthBearerToken, nil
469 }