Remove unused code
[nonrtric/plt/ranpm.git] / pm-file-converter / components / kafkacollector / kafkacollector.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20 package kafkacollector
21
22 import (
23         "context"
24         "fmt"
25         "github.com/confluentinc/confluent-kafka-go/kafka"
26         jsoniter "github.com/json-iterator/go"
27         log "github.com/sirupsen/logrus"
28         "golang.org/x/oauth2/clientcredentials"
29         "main/common/dataTypes"
30         "main/components/miniocollector"
31         "os"
32         "time"
33 )
34
35 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
36 var bootstrapserver = os.Getenv("KAFKA_SERVER")
37 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
38 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
39 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
40
41 // Limiter - valid for all jobs
42 const parallelism_limiter = 100 //For all jobs
43 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
44
45 func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
46
47         log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
48
49         topic_ok := false
50         var c *kafka.Consumer = nil
51         running := true
52
53         for topic_ok == false {
54
55                 select {
56                 case reader_ctrl := <-control_ch:
57                         if reader_ctrl.Command == "EXIT" {
58                                 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
59                                 data_ch <- nil //Signal to job handler
60                                 running = false
61                                 return
62                         }
63                 case <-time.After(1 * time.Second):
64                         if !running {
65                                 return
66                         }
67                         if c == nil {
68                                 c = create_kafka_consumer(type_id, gid, cid)
69                                 if c == nil {
70                                         log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
71                                 } else {
72                                         log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
73                                 }
74                         }
75                         if c != nil && topic_ok == false {
76                                 err := c.SubscribeTopics([]string{topic}, nil)
77                                 if err != nil {
78                                         log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
79                                 } else {
80                                         log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
81                                         topic_ok = true
82                                 }
83                         }
84                 }
85         }
86         log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
87
88         var event_chan = make(chan int)
89         go func() {
90                 for {
91                         select {
92                         case evt := <-c.Events():
93                                 switch evt.(type) {
94                                 case kafka.OAuthBearerTokenRefresh:
95                                         log.Debug("New consumer token needed: ", evt)
96                                         token, err := Fetch_token()
97                                         if err != nil {
98                                                 log.Warning("Cannot cannot fetch token: ", err)
99                                                 c.SetOAuthBearerTokenFailure(err.Error())
100                                         } else {
101                                                 setTokenError := c.SetOAuthBearerToken(*token)
102                                                 if setTokenError != nil {
103                                                         log.Warning("Cannot cannot set token: ", setTokenError)
104                                                         c.SetOAuthBearerTokenFailure(setTokenError.Error())
105                                                 }
106                                         }
107                                 default:
108                                         log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
109                                 }
110
111                         case msg := <-event_chan:
112                                 if msg == 0 {
113                                         return
114                                 }
115                         case <-time.After(1 * time.Second):
116                                 if !running {
117                                         return
118                                 }
119                         }
120                 }
121         }()
122
123         go func() {
124                 for {
125                         for {
126                                 select {
127                                 case reader_ctrl := <-control_ch:
128                                         if reader_ctrl.Command == "EXIT" {
129                                                 event_chan <- 0
130                                                 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
131                                                 data_ch <- nil //Signal to job handler
132                                                 defer c.Close()
133                                                 return
134                                         }
135                                 default:
136
137                                         ev := c.Poll(1000)
138                                         if ev == nil {
139                                                 log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
140                                                 continue
141                                         }
142                                         switch e := ev.(type) {
143                                         case *kafka.Message:
144                                                 var kmsg dataTypes.KafkaPayload
145                                                 kmsg.Msg = e
146
147                                                 c.Commit()
148
149                                                 data_ch <- &kmsg
150                                                 log.Debug("Reader msg: ", &kmsg)
151                                                 log.Debug("Reader - data_ch ", data_ch)
152                                         case kafka.Error:
153                                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
154
155                                         case kafka.OAuthBearerTokenRefresh:
156                                                 log.Debug("New consumer token needed: ", ev)
157                                                 token, err := Fetch_token()
158                                                 if err != nil {
159                                                         log.Warning("Cannot cannot fetch token: ", err)
160                                                         c.SetOAuthBearerTokenFailure(err.Error())
161                                                 } else {
162                                                         setTokenError := c.SetOAuthBearerToken(*token)
163                                                         if setTokenError != nil {
164                                                                 log.Warning("Cannot cannot set token: ", setTokenError)
165                                                                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
166                                                         }
167                                                 }
168                                         default:
169                                                 fmt.Printf("Ignored %v\n", e)
170                                         }
171                                 }
172                         }
173                 }
174         }()
175 }
176
177 func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
178
179         var kafka_producer *kafka.Producer
180
181         running := true
182         log.Info("Topic writer starting")
183
184         // Wait for kafka producer to become available - and be prepared to exit the writer
185         for kafka_producer == nil {
186                 select {
187                 case writer_ctl := <-control_ch:
188                         if writer_ctl.Command == "EXIT" {
189                                 //ignore cmd
190                         }
191                 default:
192                         kafka_producer = start_producer()
193                         if kafka_producer == nil {
194                                 log.Debug("Could not start kafka producer - retrying")
195                                 time.Sleep(1 * time.Second)
196                         } else {
197                                 log.Debug("Kafka producer started")
198                         }
199                 }
200         }
201
202         var event_chan = make(chan int)
203         go func() {
204                 for {
205                         select {
206                         case evt := <-kafka_producer.Events():
207                                 switch evt.(type) {
208                                 case *kafka.Message:
209                                         m := evt.(*kafka.Message)
210
211                                         if m.TopicPartition.Error != nil {
212                                                 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
213                                         } else {
214                                                 log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
215                                         }
216                                 case kafka.Error:
217                                         log.Debug("Dumping topic writer event, error: ", evt)
218                                 case kafka.OAuthBearerTokenRefresh:
219                                         log.Debug("New producer token needed: ", evt)
220                                         token, err := Fetch_token()
221                                         if err != nil {
222                                                 log.Warning("Cannot cannot fetch token: ", err)
223                                                 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
224                                         } else {
225                                                 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
226                                                 if setTokenError != nil {
227                                                         log.Warning("Cannot cannot set token: ", setTokenError)
228                                                         kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
229                                                 }
230                                         }
231                                 default:
232                                         log.Debug("Dumping topic writer event, unknown: ", evt)
233                                 }
234
235                         case msg := <-event_chan:
236                                 if msg == 0 {
237                                         return
238                                 }
239                         case <-time.After(1 * time.Second):
240                                 if !running {
241                                         return
242                                 }
243                         }
244                 }
245         }()
246         go func() {
247                 for {
248                         select {
249                         case writer_ctl := <-control_ch:
250                                 if writer_ctl.Command == "EXIT" {
251                                         // ignore - wait for channel signal
252                                 }
253
254                         case kmsg := <-data_ch:
255                                 if kmsg == nil {
256                                         event_chan <- 0
257                                         log.Info("Topic writer stopped by channel signal - start_topic_writer")
258                                         defer kafka_producer.Close()
259                                         return
260                                 }
261
262                                 retries := 10
263                                 msg_ok := false
264                                 var err error
265                                 for retry := 1; retry <= retries && msg_ok == false; retry++ {
266                                         err = kafka_producer.Produce(&kafka.Message{
267                                                 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
268                                                 Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
269
270                                         if err == nil {
271                                                 msg_ok = true
272                                                 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
273                                         } else {
274                                                 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
275                                                 time.Sleep(time.Duration(retry) * time.Second)
276                                         }
277                                 }
278                                 if !msg_ok {
279                                         log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
280                                 }
281                         case <-time.After(1000 * time.Millisecond):
282                                 if !running {
283                                         return
284                                 }
285                         }
286                 }
287         }()
288 }
289
290 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
291         var cm kafka.ConfigMap
292         if creds_grant_type == "" {
293                 log.Info("Creating kafka plain text consumer for type: ", type_id)
294                 cm = kafka.ConfigMap{
295                         "bootstrap.servers":  bootstrapserver,
296                         "group.id":           gid,
297                         "client.id":          cid,
298                         "auto.offset.reset":  "latest",
299                         "enable.auto.commit": false,
300                 }
301         } else {
302                 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
303                 cm = kafka.ConfigMap{
304                         "bootstrap.servers":  bootstrapserver,
305                         "group.id":           gid,
306                         "client.id":          cid,
307                         "auto.offset.reset":  "latest",
308                         "enable.auto.commit": false,
309                         "sasl.mechanism":     "OAUTHBEARER",
310                         "security.protocol":  "SASL_PLAINTEXT",
311                 }
312         }
313         c, err := kafka.NewConsumer(&cm)
314
315         if err != nil {
316                 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
317                 return nil
318         }
319
320         log.Info("Created kafka consumer for type: ", type_id, " OK")
321         return c
322 }
323
324 // Start kafka producer
325 func start_producer() *kafka.Producer {
326         log.Info("Creating kafka producer")
327
328         var cm kafka.ConfigMap
329         if creds_grant_type == "" {
330                 log.Info("Creating kafka SASL plain text producer")
331                 cm = kafka.ConfigMap{
332                         "bootstrap.servers": bootstrapserver,
333                 }
334         } else {
335                 log.Info("Creating kafka SASL plain text producer")
336                 cm = kafka.ConfigMap{
337                         "bootstrap.servers": bootstrapserver,
338                         "sasl.mechanism":    "OAUTHBEARER",
339                         "security.protocol": "SASL_PLAINTEXT",
340                 }
341         }
342
343         p, err := kafka.NewProducer(&cm)
344         if err != nil {
345                 log.Error("Cannot create kafka producer,", err)
346                 return nil
347         }
348         return p
349 }
350
351 func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
352
353         log.Info("Type job", type_id, " started")
354         topic_list := make(map[string]string)
355         topic_list[type_id] = "json-file-ready-kp"
356         topic_list["PmData"] = "json-file-ready-kpadp"
357         running := true
358         for {
359                 select {
360                 case job_ctl := <-control_ch:
361                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
362                         switch job_ctl.Command {
363                         case "EXIT":
364                                 //ignore cmd - handled by channel signal
365                         }
366
367                 case msg := <-data_in_ch:
368                         if msg == nil {
369                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
370
371                                 running = false
372                                 return
373                         }
374                         jobLimiterChan <- struct{}{}
375                         go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
376
377                 case <-time.After(1 * time.Second):
378                         if !running {
379                                 return
380                         }
381                 }
382         }
383 }
384
385 func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
386         defer func() {
387                 <-jobLimiterChan
388         }()
389         start := time.Now()
390         var evt_data dataTypes.XmlFileEventHeader
391
392         err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
393         if err != nil {
394                 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
395                 return
396         }
397         log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
398
399         start = time.Now()
400         new_fn := miniocollector.Xml_to_json_conv(&evt_data)
401         if err != nil {
402                 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
403                 return
404         }
405         log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
406
407         var fde dataTypes.FileDownloadedEvt
408         fde.Filename = new_fn
409         j, err := jsoniter.Marshal(fde)
410
411         if err != nil {
412                 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
413                 return
414         }
415         msg.Msg.Value = j
416
417         msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
418         log.Debug("Marshal file-collect event ", time.Since(start).String())
419         log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
420         for _, v := range topic_list {
421                 fmt.Println("Output Topic: " + v)
422                 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
423                 kmsg.Msg = msg.Msg
424                 kmsg.Topic = v
425                 data_out_channel <- kmsg
426         }
427 }
428
429 func Fetch_token() (*kafka.OAuthBearerToken, error) {
430         log.Debug("Get token inline")
431         conf := &clientcredentials.Config{
432                 ClientID:     creds_client_id,
433                 ClientSecret: creds_client_secret,
434                 TokenURL:     creds_service_url,
435         }
436         token, err := conf.Token(context.Background())
437         if err != nil {
438                 log.Warning("Cannot fetch access token: ", err)
439                 return nil, err
440         }
441         extensions := map[string]string{}
442         log.Debug("=====================================================")
443         log.Debug("token: ", token)
444         log.Debug("=====================================================")
445         log.Debug("TokenValue: ", token.AccessToken)
446         log.Debug("=====================================================")
447         log.Debug("Expiration: ", token.Expiry)
448         t := token.Expiry
449         oauthBearerToken := kafka.OAuthBearerToken{
450                 TokenValue: token.AccessToken,
451                 Expiration: t,
452                 Extensions: extensions,
453         }
454
455         return &oauthBearerToken, nil
456 }