3 // ========================LICENSE_START=================================
6 // Copyright (C) 2023: Nordix Foundation
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
12 // http://www.apache.org/licenses/LICENSE-2.0
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 // ========================LICENSE_END===================================
20 package kafkacollector
25 "main/common/dataTypes"
26 "main/components/miniocollector"
30 "github.com/confluentinc/confluent-kafka-go/kafka"
31 jsoniter "github.com/json-iterator/go"
32 log "github.com/sirupsen/logrus"
33 "golang.org/x/oauth2/clientcredentials"
36 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
37 var bootstrapserver = os.Getenv("KAFKA_SERVER")
38 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
39 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
40 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
42 // Limiter - valid for all jobs
43 const parallelism_limiter = 100 //For all jobs
44 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
46 // noinspection GoCognitiveComplexity
47 func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
49 log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
52 var c *kafka.Consumer = nil
55 for topic_ok == false {
58 case reader_ctrl := <-control_ch:
59 if reader_ctrl.Command == "EXIT" {
60 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
61 data_ch <- nil //Signal to job handler
65 case <-time.After(1 * time.Second):
70 c = create_kafka_consumer(type_id, gid, cid)
72 log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
74 log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
77 if c != nil && topic_ok == false {
78 err := c.SubscribeTopics([]string{topic}, nil)
80 log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
82 log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
88 log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
90 var event_chan = make(chan int)
94 case evt := <-c.Events():
96 case kafka.OAuthBearerTokenRefresh:
97 log.Debug("New consumer token needed: ", evt)
98 token, err := Fetch_token()
100 log.Warning("Cannot cannot fetch token: ", err)
101 c.SetOAuthBearerTokenFailure(err.Error())
103 setTokenError := c.SetOAuthBearerToken(*token)
104 if setTokenError != nil {
105 log.Warning("Cannot cannot set token: ", setTokenError)
106 c.SetOAuthBearerTokenFailure(setTokenError.Error())
110 log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
113 case msg := <-event_chan:
117 case <-time.After(1 * time.Second):
129 case reader_ctrl := <-control_ch:
130 if reader_ctrl.Command == "EXIT" {
132 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
133 data_ch <- nil //Signal to job handler
141 log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
144 switch e := ev.(type) {
146 var kmsg dataTypes.KafkaPayload
152 log.Debug("Reader msg: ", &kmsg)
153 log.Debug("Reader - data_ch ", data_ch)
155 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
157 case kafka.OAuthBearerTokenRefresh:
158 log.Debug("New consumer token needed: ", ev)
159 token, err := Fetch_token()
161 log.Warning("Cannot cannot fetch token: ", err)
162 c.SetOAuthBearerTokenFailure(err.Error())
164 setTokenError := c.SetOAuthBearerToken(*token)
165 if setTokenError != nil {
166 log.Warning("Cannot cannot set token: ", setTokenError)
167 c.SetOAuthBearerTokenFailure(setTokenError.Error())
171 fmt.Printf("Ignored %v\n", e)
179 func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
181 var kafka_producer *kafka.Producer
184 log.Info("Topic writer starting")
186 // Wait for kafka producer to become available - and be prepared to exit the writer
187 for kafka_producer == nil {
189 case writer_ctl := <-control_ch:
190 if writer_ctl.Command == "EXIT" {
194 kafka_producer = start_producer()
195 if kafka_producer == nil {
196 log.Debug("Could not start kafka producer - retrying")
197 time.Sleep(1 * time.Second)
199 log.Debug("Kafka producer started")
204 var event_chan = make(chan int)
208 case evt := <-kafka_producer.Events():
211 m := evt.(*kafka.Message)
213 if m.TopicPartition.Error != nil {
214 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
216 log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
219 log.Debug("Dumping topic writer event, error: ", evt)
220 case kafka.OAuthBearerTokenRefresh:
221 log.Debug("New producer token needed: ", evt)
222 token, err := Fetch_token()
224 log.Warning("Cannot cannot fetch token: ", err)
225 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
227 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
228 if setTokenError != nil {
229 log.Warning("Cannot cannot set token: ", setTokenError)
230 kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
234 log.Debug("Dumping topic writer event, unknown: ", evt)
237 case msg := <-event_chan:
241 case <-time.After(1 * time.Second):
251 case writer_ctl := <-control_ch:
252 if writer_ctl.Command == "EXIT" {
253 // ignore - wait for channel signal
256 case kmsg := <-data_ch:
259 log.Info("Topic writer stopped by channel signal - start_topic_writer")
260 defer kafka_producer.Close()
267 for retry := 1; retry <= retries && msg_ok == false; retry++ {
268 err = kafka_producer.Produce(&kafka.Message{
269 TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
270 Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
274 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
276 log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
277 time.Sleep(time.Duration(retry) * time.Second)
281 log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
283 case <-time.After(1000 * time.Millisecond):
292 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
293 var cm kafka.ConfigMap
294 if creds_grant_type == "" {
295 log.Info("Creating kafka plain text consumer for type: ", type_id)
296 cm = kafka.ConfigMap{
297 "bootstrap.servers": bootstrapserver,
300 "auto.offset.reset": "latest",
301 "enable.auto.commit": false,
304 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
305 cm = kafka.ConfigMap{
306 "bootstrap.servers": bootstrapserver,
309 "auto.offset.reset": "latest",
310 "enable.auto.commit": false,
311 "sasl.mechanism": "OAUTHBEARER",
312 "security.protocol": "SASL_PLAINTEXT",
315 c, err := kafka.NewConsumer(&cm)
318 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
322 log.Info("Created kafka consumer for type: ", type_id, " OK")
326 // Start kafka producer
327 func start_producer() *kafka.Producer {
328 log.Info("Creating kafka producer")
330 var cm kafka.ConfigMap
331 if creds_grant_type == "" {
332 log.Info("Creating kafka SASL plain text producer")
333 cm = kafka.ConfigMap{
334 "bootstrap.servers": bootstrapserver,
337 log.Info("Creating kafka SASL plain text producer")
338 cm = kafka.ConfigMap{
339 "bootstrap.servers": bootstrapserver,
340 "sasl.mechanism": "OAUTHBEARER",
341 "security.protocol": "SASL_PLAINTEXT",
345 p, err := kafka.NewProducer(&cm)
347 log.Error("Cannot create kafka producer,", err)
353 func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
355 log.Info("Type job", type_id, " started")
356 topic_list := make(map[string]string)
357 topic_list[type_id] = "json-file-ready-kp"
358 topic_list["PmData"] = "json-file-ready-kpadp"
362 case job_ctl := <-control_ch:
363 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
364 switch job_ctl.Command {
366 //ignore cmd - handled by channel signal
369 case msg := <-data_in_ch:
371 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
376 jobLimiterChan <- struct{}{}
377 go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
379 case <-time.After(1 * time.Second):
387 func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
392 var evt_data dataTypes.XmlFileEventHeader
394 err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
396 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
399 log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
402 new_fn := miniocollector.Xml_to_json_conv(&evt_data)
404 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
407 log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
409 var fde dataTypes.FileDownloadedEvt
410 fde.Filename = new_fn
411 j, err := jsoniter.Marshal(fde)
414 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
419 msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
420 log.Debug("Marshal file-collect event ", time.Since(start).String())
421 log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
422 for _, v := range topic_list {
423 fmt.Println("Output Topic: " + v)
424 var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
427 data_out_channel <- kmsg
431 func Fetch_token() (*kafka.OAuthBearerToken, error) {
432 log.Debug("Get token inline")
433 conf := &clientcredentials.Config{
434 ClientID: creds_client_id,
435 ClientSecret: creds_client_secret,
436 TokenURL: creds_service_url,
438 token, err := conf.Token(context.Background())
440 log.Warning("Cannot fetch access token: ", err)
443 extensions := map[string]string{}
444 log.Debug("=====================================================")
445 log.Debug("token: ", token)
446 log.Debug("=====================================================")
447 log.Debug("TokenValue: ", token.AccessToken)
448 log.Debug("=====================================================")
449 log.Debug("Expiration: ", token.Expiry)
451 oauthBearerToken := kafka.OAuthBearerToken{
452 TokenValue: token.AccessToken,
454 Extensions: extensions,
457 return &oauthBearerToken, nil