Improve code coverage for PM File Converter (Go)
[nonrtric/plt/ranpm.git] / pm-file-converter / components / kafkacollector / kafkacollector.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20 package kafkacollector
21
22 import (
23         "context"
24         "fmt"
25         "main/common/dataTypes"
26         "main/components/miniocollector"
27         "os"
28         "time"
29
30         "github.com/confluentinc/confluent-kafka-go/kafka"
31         jsoniter "github.com/json-iterator/go"
32         log "github.com/sirupsen/logrus"
33         "golang.org/x/oauth2/clientcredentials"
34 )
35
36 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
37 var bootstrapserver = os.Getenv("KAFKA_SERVER")
38 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
39 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
40 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
41
42 // Limiter - valid for all jobs
43 const parallelism_limiter = 100 //For all jobs
44 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
45
46 // noinspection GoCognitiveComplexity
47 func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
48
49         log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
50
51         topic_ok := false
52         var c *kafka.Consumer = nil
53         running := true
54
55         for topic_ok == false {
56
57                 select {
58                 case reader_ctrl := <-control_ch:
59                         if reader_ctrl.Command == "EXIT" {
60                                 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
61                                 data_ch <- nil //Signal to job handler
62                                 running = false
63                                 return
64                         }
65                 case <-time.After(1 * time.Second):
66                         if !running {
67                                 return
68                         }
69                         if c == nil {
70                                 c = create_kafka_consumer(type_id, gid, cid)
71                                 if c == nil {
72                                         log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
73                                 } else {
74                                         log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
75                                 }
76                         }
77                         if c != nil && topic_ok == false {
78                                 err := c.SubscribeTopics([]string{topic}, nil)
79                                 if err != nil {
80                                         log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
81                                 } else {
82                                         log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
83                                         topic_ok = true
84                                 }
85                         }
86                 }
87         }
88         log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
89
90         var event_chan = make(chan int)
91         go func() {
92                 for {
93                         select {
94                         case evt := <-c.Events():
95                                 switch evt.(type) {
96                                 case kafka.OAuthBearerTokenRefresh:
97                                         log.Debug("New consumer token needed: ", evt)
98                                         token, err := Fetch_token()
99                                         if err != nil {
100                                                 log.Warning("Cannot cannot fetch token: ", err)
101                                                 c.SetOAuthBearerTokenFailure(err.Error())
102                                         } else {
103                                                 setTokenError := c.SetOAuthBearerToken(*token)
104                                                 if setTokenError != nil {
105                                                         log.Warning("Cannot cannot set token: ", setTokenError)
106                                                         c.SetOAuthBearerTokenFailure(setTokenError.Error())
107                                                 }
108                                         }
109                                 default:
110                                         log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
111                                 }
112
113                         case msg := <-event_chan:
114                                 if msg == 0 {
115                                         return
116                                 }
117                         case <-time.After(1 * time.Second):
118                                 if !running {
119                                         return
120                                 }
121                         }
122                 }
123         }()
124
125         go func() {
126                 for {
127                         for {
128                                 select {
129                                 case reader_ctrl := <-control_ch:
130                                         if reader_ctrl.Command == "EXIT" {
131                                                 event_chan <- 0
132                                                 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
133                                                 data_ch <- nil //Signal to job handler
134                                                 defer c.Close()
135                                                 return
136                                         }
137                                 default:
138
139                                         ev := c.Poll(1000)
140                                         if ev == nil {
141                                                 log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
142                                                 continue
143                                         }
144                                         switch e := ev.(type) {
145                                         case *kafka.Message:
146                                                 var kmsg dataTypes.KafkaPayload
147                                                 kmsg.Msg = e
148
149                                                 c.Commit()
150
151                                                 data_ch <- &kmsg
152                                                 log.Debug("Reader msg: ", &kmsg)
153                                                 log.Debug("Reader - data_ch ", data_ch)
154                                         case kafka.Error:
155                                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
156
157                                         case kafka.OAuthBearerTokenRefresh:
158                                                 log.Debug("New consumer token needed: ", ev)
159                                                 token, err := Fetch_token()
160                                                 if err != nil {
161                                                         log.Warning("Cannot cannot fetch token: ", err)
162                                                         c.SetOAuthBearerTokenFailure(err.Error())
163                                                 } else {
164                                                         setTokenError := c.SetOAuthBearerToken(*token)
165                                                         if setTokenError != nil {
166                                                                 log.Warning("Cannot cannot set token: ", setTokenError)
167                                                                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
168                                                         }
169                                                 }
170                                         default:
171                                                 fmt.Printf("Ignored %v\n", e)
172                                         }
173                                 }
174                         }
175                 }
176         }()
177 }
178
179 func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
180
181         var kafka_producer *kafka.Producer
182
183         running := true
184         log.Info("Topic writer starting")
185
186         // Wait for kafka producer to become available - and be prepared to exit the writer
187         for kafka_producer == nil {
188                 select {
189                 case writer_ctl := <-control_ch:
190                         if writer_ctl.Command == "EXIT" {
191                                 //ignore cmd
192                         }
193                 default:
194                         kafka_producer = start_producer()
195                         if kafka_producer == nil {
196                                 log.Debug("Could not start kafka producer - retrying")
197                                 time.Sleep(1 * time.Second)
198                         } else {
199                                 log.Debug("Kafka producer started")
200                         }
201                 }
202         }
203
204         var event_chan = make(chan int)
205         go func() {
206                 for {
207                         select {
208                         case evt := <-kafka_producer.Events():
209                                 switch evt.(type) {
210                                 case *kafka.Message:
211                                         m := evt.(*kafka.Message)
212
213                                         if m.TopicPartition.Error != nil {
214                                                 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
215                                         } else {
216                                                 log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
217                                         }
218                                 case kafka.Error:
219                                         log.Debug("Dumping topic writer event, error: ", evt)
220                                 case kafka.OAuthBearerTokenRefresh:
221                                         log.Debug("New producer token needed: ", evt)
222                                         token, err := Fetch_token()
223                                         if err != nil {
224                                                 log.Warning("Cannot cannot fetch token: ", err)
225                                                 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
226                                         } else {
227                                                 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
228                                                 if setTokenError != nil {
229                                                         log.Warning("Cannot cannot set token: ", setTokenError)
230                                                         kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
231                                                 }
232                                         }
233                                 default:
234                                         log.Debug("Dumping topic writer event, unknown: ", evt)
235                                 }
236
237                         case msg := <-event_chan:
238                                 if msg == 0 {
239                                         return
240                                 }
241                         case <-time.After(1 * time.Second):
242                                 if !running {
243                                         return
244                                 }
245                         }
246                 }
247         }()
248         go func() {
249                 for {
250                         select {
251                         case writer_ctl := <-control_ch:
252                                 if writer_ctl.Command == "EXIT" {
253                                         // ignore - wait for channel signal
254                                 }
255
256                         case kmsg := <-data_ch:
257                                 if kmsg == nil {
258                                         event_chan <- 0
259                                         log.Info("Topic writer stopped by channel signal - start_topic_writer")
260                                         defer kafka_producer.Close()
261                                         return
262                                 }
263
264                                 retries := 10
265                                 msg_ok := false
266                                 var err error
267                                 for retry := 1; retry <= retries && msg_ok == false; retry++ {
268                                         err = kafka_producer.Produce(&kafka.Message{
269                                                 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
270                                                 Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
271
272                                         if err == nil {
273                                                 msg_ok = true
274                                                 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
275                                         } else {
276                                                 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
277                                                 time.Sleep(time.Duration(retry) * time.Second)
278                                         }
279                                 }
280                                 if !msg_ok {
281                                         log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
282                                 }
283                         case <-time.After(1000 * time.Millisecond):
284                                 if !running {
285                                         return
286                                 }
287                         }
288                 }
289         }()
290 }
291
292 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
293         var cm kafka.ConfigMap
294         if creds_grant_type == "" {
295                 log.Info("Creating kafka plain text consumer for type: ", type_id)
296                 cm = kafka.ConfigMap{
297                         "bootstrap.servers":  bootstrapserver,
298                         "group.id":           gid,
299                         "client.id":          cid,
300                         "auto.offset.reset":  "latest",
301                         "enable.auto.commit": false,
302                 }
303         } else {
304                 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
305                 cm = kafka.ConfigMap{
306                         "bootstrap.servers":  bootstrapserver,
307                         "group.id":           gid,
308                         "client.id":          cid,
309                         "auto.offset.reset":  "latest",
310                         "enable.auto.commit": false,
311                         "sasl.mechanism":     "OAUTHBEARER",
312                         "security.protocol":  "SASL_PLAINTEXT",
313                 }
314         }
315         c, err := kafka.NewConsumer(&cm)
316
317         if err != nil {
318                 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
319                 return nil
320         }
321
322         log.Info("Created kafka consumer for type: ", type_id, " OK")
323         return c
324 }
325
326 // Start kafka producer
327 func start_producer() *kafka.Producer {
328         log.Info("Creating kafka producer")
329
330         var cm kafka.ConfigMap
331         if creds_grant_type == "" {
332                 log.Info("Creating kafka SASL plain text producer")
333                 cm = kafka.ConfigMap{
334                         "bootstrap.servers": bootstrapserver,
335                 }
336         } else {
337                 log.Info("Creating kafka SASL plain text producer")
338                 cm = kafka.ConfigMap{
339                         "bootstrap.servers": bootstrapserver,
340                         "sasl.mechanism":    "OAUTHBEARER",
341                         "security.protocol": "SASL_PLAINTEXT",
342                 }
343         }
344
345         p, err := kafka.NewProducer(&cm)
346         if err != nil {
347                 log.Error("Cannot create kafka producer,", err)
348                 return nil
349         }
350         return p
351 }
352
353 func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
354
355         log.Info("Type job", type_id, " started")
356         topic_list := make(map[string]string)
357         topic_list[type_id] = "json-file-ready-kp"
358         topic_list["PmData"] = "json-file-ready-kpadp"
359         running := true
360         for {
361                 select {
362                 case job_ctl := <-control_ch:
363                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
364                         switch job_ctl.Command {
365                         case "EXIT":
366                                 //ignore cmd - handled by channel signal
367                         }
368
369                 case msg := <-data_in_ch:
370                         if msg == nil {
371                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
372
373                                 running = false
374                                 return
375                         }
376                         jobLimiterChan <- struct{}{}
377                         go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
378
379                 case <-time.After(1 * time.Second):
380                         if !running {
381                                 return
382                         }
383                 }
384         }
385 }
386
387 func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
388         defer func() {
389                 <-jobLimiterChan
390         }()
391         start := time.Now()
392         var evt_data dataTypes.XmlFileEventHeader
393
394         err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
395         if err != nil {
396                 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
397                 return
398         }
399         log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
400
401         start = time.Now()
402         new_fn := miniocollector.Xml_to_json_conv(&evt_data)
403         if err != nil {
404                 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
405                 return
406         }
407         log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
408
409         var fde dataTypes.FileDownloadedEvt
410         fde.Filename = new_fn
411         j, err := jsoniter.Marshal(fde)
412
413         if err != nil {
414                 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
415                 return
416         }
417         msg.Msg.Value = j
418
419         msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
420         log.Debug("Marshal file-collect event ", time.Since(start).String())
421         log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
422         for _, v := range topic_list {
423                 fmt.Println("Output Topic: " + v)
424                 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
425                 kmsg.Msg = msg.Msg
426                 kmsg.Topic = v
427                 data_out_channel <- kmsg
428         }
429 }
430
431 func Fetch_token() (*kafka.OAuthBearerToken, error) {
432         log.Debug("Get token inline")
433         conf := &clientcredentials.Config{
434                 ClientID:     creds_client_id,
435                 ClientSecret: creds_client_secret,
436                 TokenURL:     creds_service_url,
437         }
438         token, err := conf.Token(context.Background())
439         if err != nil {
440                 log.Warning("Cannot fetch access token: ", err)
441                 return nil, err
442         }
443         extensions := map[string]string{}
444         log.Debug("=====================================================")
445         log.Debug("token: ", token)
446         log.Debug("=====================================================")
447         log.Debug("TokenValue: ", token.AccessToken)
448         log.Debug("=====================================================")
449         log.Debug("Expiration: ", token.Expiry)
450         t := token.Expiry
451         oauthBearerToken := kafka.OAuthBearerToken{
452                 TokenValue: token.AccessToken,
453                 Expiration: t,
454                 Extensions: extensions,
455         }
456
457         return &oauthBearerToken, nil
458 }