3 // ========================LICENSE_START=================================
6 // Copyright (C) 2023: Nordix Foundation
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
12 // http://www.apache.org/licenses/LICENSE-2.0
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 // ========================LICENSE_END===================================
20 package kafkacollector
25 "main/common/dataTypes"
26 "main/components/miniocollector"
30 "github.com/confluentinc/confluent-kafka-go/kafka"
31 jsoniter "github.com/json-iterator/go"
32 log "github.com/sirupsen/logrus"
33 "golang.org/x/oauth2/clientcredentials"
36 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
37 var bootstrapserver = os.Getenv("KAFKA_SERVER")
38 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
39 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
40 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
42 // Limiter - valid for all jobs
43 const parallelism_limiter = 100 //For all jobs
44 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
46 const typeLabel = " for type: "
47 const fetchTokenErrorMessage = "Cannot fetch token: "
48 const setTokenErrorMessage = "Cannot set token: "
50 // This function intentionally has high cognitive complexity // NOSONAR
51 func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
53 log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
56 var c *kafka.Consumer = nil
59 for topicOk == false {
62 case readerCtrl := <-controlCh:
63 if readerCtrl.Command == "EXIT" {
64 log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
65 dataCh <- nil //Signal to job handler
69 case <-time.After(1 * time.Second):
74 c = createKafkaConsumer(typeId, gid, cid)
76 log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
78 log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
81 if c != nil && topicOk == false {
82 err := c.SubscribeTopics([]string{topic}, nil)
84 log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying -- error details: ", err)
86 log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
92 log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
94 var eventChan = make(chan int)
98 case evt := <-c.Events():
100 case kafka.OAuthBearerTokenRefresh:
101 log.Debug("New consumer token needed: ", evt)
102 token, err := FetchToken()
104 log.Warning(fetchTokenErrorMessage, err)
105 c.SetOAuthBearerTokenFailure(err.Error())
107 setTokenError := c.SetOAuthBearerToken(*token)
108 if setTokenError != nil {
109 log.Warning(setTokenErrorMessage, setTokenError)
110 c.SetOAuthBearerTokenFailure(setTokenError.Error())
114 log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
117 case msg := <-eventChan:
121 case <-time.After(1 * time.Second):
133 case readerCtrl := <-controlCh:
134 if readerCtrl.Command == "EXIT" {
136 log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
137 dataCh <- nil //Signal to job handler
145 log.Debug("Topic Reader for type: ", typeId, " Nothing to consume on topic: ", topic)
148 switch e := ev.(type) {
150 var kmsg dataTypes.KafkaPayload
156 log.Debug("Reader msg: ", &kmsg)
157 log.Debug("Reader - data_ch ", dataCh)
159 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
161 case kafka.OAuthBearerTokenRefresh:
162 log.Debug("New consumer token needed: ", ev)
163 token, err := FetchToken()
165 log.Warning(fetchTokenErrorMessage, err)
166 c.SetOAuthBearerTokenFailure(err.Error())
168 setTokenError := c.SetOAuthBearerToken(*token)
169 if setTokenError != nil {
170 log.Warning(setTokenErrorMessage, setTokenError)
171 c.SetOAuthBearerTokenFailure(setTokenError.Error())
175 fmt.Printf("Ignored %v\n", e)
183 // This function intentionally has high cognitive complexity // NOSONAR
184 func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
186 var kafkaProducer *kafka.Producer
189 log.Info("Topic writer starting")
191 // Wait for kafka producer to become available - and be prepared to exit the writer
192 for kafkaProducer == nil {
194 case writerCtl := <-controlCh:
195 if writerCtl.Command == "EXIT" {
199 kafkaProducer = startProducer()
200 if kafkaProducer == nil {
201 log.Debug("Could not start kafka producer - retrying")
202 time.Sleep(1 * time.Second)
204 log.Debug("Kafka producer started")
209 var eventChan = make(chan int)
213 case evt := <-kafkaProducer.Events():
216 m := evt.(*kafka.Message)
218 if m.TopicPartition.Error != nil {
219 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
221 log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
224 log.Debug("Dumping topic writer event, error: ", evt)
225 case kafka.OAuthBearerTokenRefresh:
226 log.Debug("New producer token needed: ", evt)
227 token, err := FetchToken()
229 log.Warning(fetchTokenErrorMessage, err)
230 kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
232 setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
233 if setTokenError != nil {
234 log.Warning(setTokenErrorMessage, setTokenError)
235 kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
239 log.Debug("Dumping topic writer event, unknown: ", evt)
242 case msg := <-eventChan:
246 case <-time.After(1 * time.Second):
256 case writerCtl := <-controlCh:
257 if writerCtl.Command == "EXIT" {
258 // ignore - wait for channel signal
261 case kmsg := <-dataCh:
264 log.Info("Topic writer stopped by channel signal - start_topic_writer")
265 defer kafkaProducer.Close()
272 for retry := 1; retry <= retries && msgOk == false; retry++ {
273 err = kafkaProducer.Produce(&kafka.Message{
274 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
275 Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
279 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
281 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
282 time.Sleep(time.Duration(retry) * time.Second)
286 log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
288 case <-time.After(1000 * time.Millisecond):
297 // This function intentionally has high cognitive complexity // NOSONAR
298 func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
299 var cm kafka.ConfigMap
300 if creds_grant_type == "" {
301 log.Info("Creating kafka plain text consumer for type: ", typeId)
302 cm = kafka.ConfigMap{
303 "bootstrap.servers": bootstrapserver,
306 "auto.offset.reset": "latest",
307 "enable.auto.commit": false,
310 log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
311 cm = kafka.ConfigMap{
312 "bootstrap.servers": bootstrapserver,
315 "auto.offset.reset": "latest",
316 "enable.auto.commit": false,
317 "sasl.mechanism": "OAUTHBEARER",
318 "security.protocol": "SASL_PLAINTEXT",
321 c, err := kafka.NewConsumer(&cm)
324 log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
328 log.Info("Created kafka consumer for type: ", typeId, " OK")
332 // Start kafka producer
334 func startProducer() *kafka.Producer {
335 log.Info("Creating kafka producer")
337 var cm kafka.ConfigMap
338 if creds_grant_type == "" {
339 log.Info("Creating kafka SASL plain text producer")
340 cm = kafka.ConfigMap{
341 "bootstrap.servers": bootstrapserver,
344 log.Info("Creating kafka SASL plain text producer")
345 cm = kafka.ConfigMap{
346 "bootstrap.servers": bootstrapserver,
347 "sasl.mechanism": "OAUTHBEARER",
348 "security.protocol": "SASL_PLAINTEXT",
352 p, err := kafka.NewProducer(&cm)
354 log.Error("Cannot create kafka producer,", err)
360 func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
362 log.Info("Type job", type_id, " started")
363 topicList := make(map[string]string)
364 topicList[type_id] = "json-file-ready-kp"
365 topicList["PmData"] = "json-file-ready-kpadp"
369 case jobCtl := <-control_ch:
370 log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
371 switch jobCtl.Command {
373 //ignore cmd - handled by channel signal
376 case msg := <-data_in_ch:
378 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
383 jobLimiterChan <- struct{}{}
384 go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
386 case <-time.After(1 * time.Second):
394 // This function intentionally has more parameters for legacy compatibility // NOSONAR
395 func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
400 var evtData dataTypes.XmlFileEventHeader
402 err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
404 log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
407 log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
410 newFn := miniocollector.XmlToJsonConv(&evtData)
412 log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
415 log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
417 var fde dataTypes.FileDownloadedEvt
419 j, err := jsoniter.Marshal(fde)
422 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
427 msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
428 log.Debug("Marshal file-collect event ", time.Since(start).String())
429 log.Debug("Sending file-collect event to output topic(s)", len(topicList))
430 for _, v := range topicList {
431 fmt.Println("Output Topic: " + v)
432 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
435 dataOutChannel <- kmsg
440 func FetchToken() (*kafka.OAuthBearerToken, error) {
441 log.Debug("Get token inline")
442 conf := &clientcredentials.Config{
443 ClientID: creds_client_id,
444 ClientSecret: creds_client_secret,
445 TokenURL: creds_service_url,
447 token, err := conf.Token(context.Background())
449 log.Warning("Cannot fetch access token: ", err)
452 extensions := map[string]string{}
454 log.Debug("=====================================================")
455 log.Debug("token: ", token)
456 log.Debug("=====================================================")
457 log.Debug("TokenValue: ", token.AccessToken)
458 log.Debug("=====================================================")
459 log.Debug("Expiration: ", token.Expiry)
461 oauthBearerToken := kafka.OAuthBearerToken{
462 TokenValue: token.AccessToken,
464 Extensions: extensions,
467 return &oauthBearerToken, nil