3 // ========================LICENSE_START=================================
6 // Copyright (C) 2023: Nordix Foundation
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
12 // http://www.apache.org/licenses/LICENSE-2.0
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 // ========================LICENSE_END===================================
22 package kafkacollector
27 "main/common/dataTypes"
28 "main/components/miniocollector"
32 "github.com/confluentinc/confluent-kafka-go/kafka"
33 jsoniter "github.com/json-iterator/go"
34 log "github.com/sirupsen/logrus"
35 "golang.org/x/oauth2/clientcredentials"
38 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
39 var bootstrapserver = os.Getenv("KAFKA_SERVER")
40 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
41 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
42 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
44 // Limiter - valid for all jobs
45 const parallelism_limiter = 100 //For all jobs
46 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
48 const typeLabel = " for type: "
49 const fetchTokenErrorMessage = "Cannot fetch token: "
50 const setTokenErrorMessage = "Cannot set token: "
53 func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
55 log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
58 var c *kafka.Consumer = nil
61 for topicOk == false {
64 case readerCtrl := <-controlCh:
65 if readerCtrl.Command == "EXIT" {
66 log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
67 dataCh <- nil //Signal to job handler
71 case <-time.After(1 * time.Second):
76 c = createKafkaConsumer(typeId, gid, cid)
78 log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
80 log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
83 if c != nil && topicOk == false {
84 err := c.SubscribeTopics([]string{topic}, nil)
86 log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying -- error details: ", err)
88 log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
94 log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
96 var eventChan = make(chan int)
100 case evt := <-c.Events():
102 case kafka.OAuthBearerTokenRefresh:
103 log.Debug("New consumer token needed: ", evt)
104 token, err := FetchToken()
106 log.Warning(fetchTokenErrorMessage, err)
107 c.SetOAuthBearerTokenFailure(err.Error())
109 setTokenError := c.SetOAuthBearerToken(*token)
110 if setTokenError != nil {
111 log.Warning(setTokenErrorMessage, setTokenError)
112 c.SetOAuthBearerTokenFailure(setTokenError.Error())
116 log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
119 case msg := <-eventChan:
123 case <-time.After(1 * time.Second):
135 case readerCtrl := <-controlCh:
136 if readerCtrl.Command == "EXIT" {
138 log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
139 dataCh <- nil //Signal to job handler
147 log.Debug("Topic Reader for type: ", typeId, " Nothing to consume on topic: ", topic)
150 switch e := ev.(type) {
152 var kmsg dataTypes.KafkaPayload
158 log.Debug("Reader msg: ", &kmsg)
159 log.Debug("Reader - data_ch ", dataCh)
161 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
163 case kafka.OAuthBearerTokenRefresh:
164 log.Debug("New consumer token needed: ", ev)
165 token, err := FetchToken()
167 log.Warning(fetchTokenErrorMessage, err)
168 c.SetOAuthBearerTokenFailure(err.Error())
170 setTokenError := c.SetOAuthBearerToken(*token)
171 if setTokenError != nil {
172 log.Warning(setTokenErrorMessage, setTokenError)
173 c.SetOAuthBearerTokenFailure(setTokenError.Error())
177 fmt.Printf("Ignored %v\n", e)
186 func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
188 var kafkaProducer *kafka.Producer
191 log.Info("Topic writer starting")
193 // Wait for kafka producer to become available - and be prepared to exit the writer
194 for kafkaProducer == nil {
196 case writerCtl := <-controlCh:
197 if writerCtl.Command == "EXIT" {
201 kafkaProducer = startProducer()
202 if kafkaProducer == nil {
203 log.Debug("Could not start kafka producer - retrying")
204 time.Sleep(1 * time.Second)
206 log.Debug("Kafka producer started")
211 var eventChan = make(chan int)
215 case evt := <-kafkaProducer.Events():
218 m := evt.(*kafka.Message)
220 if m.TopicPartition.Error != nil {
221 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
223 log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
226 log.Debug("Dumping topic writer event, error: ", evt)
227 case kafka.OAuthBearerTokenRefresh:
228 log.Debug("New producer token needed: ", evt)
229 token, err := FetchToken()
231 log.Warning(fetchTokenErrorMessage, err)
232 kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
234 setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
235 if setTokenError != nil {
236 log.Warning(setTokenErrorMessage, setTokenError)
237 kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
241 log.Debug("Dumping topic writer event, unknown: ", evt)
244 case msg := <-eventChan:
248 case <-time.After(1 * time.Second):
258 case writerCtl := <-controlCh:
259 if writerCtl.Command == "EXIT" {
260 // ignore - wait for channel signal
263 case kmsg := <-dataCh:
266 log.Info("Topic writer stopped by channel signal - start_topic_writer")
267 defer kafkaProducer.Close()
274 for retry := 1; retry <= retries && msgOk == false; retry++ {
275 err = kafkaProducer.Produce(&kafka.Message{
276 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
277 Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
281 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
283 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
284 time.Sleep(time.Duration(retry) * time.Second)
288 log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
290 case <-time.After(1000 * time.Millisecond):
300 func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
301 var cm kafka.ConfigMap
302 if creds_grant_type == "" {
303 log.Info("Creating kafka plain text consumer for type: ", typeId)
304 cm = kafka.ConfigMap{
305 "bootstrap.servers": bootstrapserver,
308 "auto.offset.reset": "latest",
309 "enable.auto.commit": false,
312 log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
313 cm = kafka.ConfigMap{
314 "bootstrap.servers": bootstrapserver,
317 "auto.offset.reset": "latest",
318 "enable.auto.commit": false,
319 "sasl.mechanism": "OAUTHBEARER",
320 "security.protocol": "SASL_PLAINTEXT",
323 c, err := kafka.NewConsumer(&cm)
326 log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
330 log.Info("Created kafka consumer for type: ", typeId, " OK")
335 func startProducer() *kafka.Producer {
336 log.Info("Creating kafka producer")
338 var cm kafka.ConfigMap
339 if creds_grant_type == "" {
340 log.Info("Creating kafka SASL plain text producer")
341 cm = kafka.ConfigMap{
342 "bootstrap.servers": bootstrapserver,
345 log.Info("Creating kafka SASL plain text producer")
346 cm = kafka.ConfigMap{
347 "bootstrap.servers": bootstrapserver,
348 "sasl.mechanism": "OAUTHBEARER",
349 "security.protocol": "SASL_PLAINTEXT",
353 p, err := kafka.NewProducer(&cm)
355 log.Error("Cannot create kafka producer,", err)
362 func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
364 log.Info("Type job", typeId, " started")
365 topicList := make(map[string]string)
366 topicList[typeId] = "json-file-ready-kp"
367 topicList["PmData"] = "json-file-ready-kpadp"
371 case jobCtl := <-controlCh:
372 log.Debug("Type job ", typeId, " new cmd received ", jobCtl.Command)
373 switch jobCtl.Command {
375 //ignore cmd - handled by channel signal
378 case msg := <-dataInCh:
380 log.Info("Type job ", typeId, " stopped by channel signal - start_job_xml_file_data")
385 jobLimiterChan <- struct{}{}
386 go runXmlJob(typeId, msg, "gz", dataOutChannel, topicList, jobLimiterChan, fvolume, fsbucket)
388 case <-time.After(1 * time.Second):
397 func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
402 var evtData dataTypes.XmlFileEventHeader
404 err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
406 log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
409 log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
412 newFn := miniocollector.XmlToJsonConv(&evtData)
414 log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
417 log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
419 var fde dataTypes.FileDownloadedEvt
421 j, err := jsoniter.Marshal(fde)
424 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
429 msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
430 log.Debug("Marshal file-collect event ", time.Since(start).String())
431 log.Debug("Sending file-collect event to output topic(s)", len(topicList))
432 for _, v := range topicList {
433 fmt.Println("Output Topic: " + v)
434 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
437 dataOutChannel <- kmsg
442 func FetchToken() (*kafka.OAuthBearerToken, error) {
443 log.Debug("Get token inline")
444 conf := &clientcredentials.Config{
445 ClientID: creds_client_id,
446 ClientSecret: creds_client_secret,
447 TokenURL: creds_service_url,
449 token, err := conf.Token(context.Background())
451 log.Warning("Cannot fetch access token: ", err)
454 extensions := map[string]string{}
456 log.Debug("=====================================================")
457 log.Debug("token: ", token)
458 log.Debug("=====================================================")
459 log.Debug("TokenValue: ", token.AccessToken)
460 log.Debug("=====================================================")
461 log.Debug("Expiration: ", token.Expiry)
463 oauthBearerToken := kafka.OAuthBearerToken{
464 TokenValue: token.AccessToken,
466 Extensions: extensions,
469 return &oauthBearerToken, nil