Fixing Sonar Code Smell for PMConverter
[nonrtric/plt/ranpm.git] / pm-file-converter / components / kafkacollector / kafkacollector.go
index 7451dd2..a7eaebf 100644 (file)
@@ -43,22 +43,26 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
 const parallelism_limiter = 100 //For all jobs
 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
 
-// noinspection GoCognitiveComplexity
-func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
+const typeLabel = " for type: "
+const fetchTokenErrorMessage = "Cannot fetch token: "
+const setTokenErrorMessage = "Cannot set token: "
 
-       log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
 
-       topic_ok := false
+       log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
+
+       topicOk := false
        var c *kafka.Consumer = nil
        running := true
 
-       for topic_ok == false {
+       for topicOk == false {
 
                select {
-               case reader_ctrl := <-control_ch:
-                       if reader_ctrl.Command == "EXIT" {
-                               log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                               data_ch <- nil //Signal to job handler
+               case readerCtrl := <-controlCh:
+                       if readerCtrl.Command == "EXIT" {
+                               log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+                               dataCh <- nil //Signal to job handler
                                running = false
                                return
                        }
@@ -67,27 +71,27 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
                                return
                        }
                        if c == nil {
-                               c = create_kafka_consumer(type_id, gid, cid)
+                               c = createKafkaConsumer(typeId, gid, cid)
                                if c == nil {
-                                       log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
+                                       log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
                                } else {
-                                       log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
+                                       log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
                                }
                        }
-                       if c != nil && topic_ok == false {
+                       if c != nil && topicOk == false {
                                err := c.SubscribeTopics([]string{topic}, nil)
                                if err != nil {
-                                       log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
+                                       log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying --  error details: ", err)
                                } else {
-                                       log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
-                                       topic_ok = true
+                                       log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
+                                       topicOk = true
                                }
                        }
                }
        }
-       log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
+       log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
 
-       var event_chan = make(chan int)
+       var eventChan = make(chan int)
        go func() {
                for {
                        select {
@@ -95,22 +99,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
                                switch evt.(type) {
                                case kafka.OAuthBearerTokenRefresh:
                                        log.Debug("New consumer token needed: ", evt)
-                                       token, err := Fetch_token()
+                                       token, err := FetchToken()
                                        if err != nil {
-                                               log.Warning("Cannot cannot fetch token: ", err)
+                                               log.Warning(fetchTokenErrorMessage, err)
                                                c.SetOAuthBearerTokenFailure(err.Error())
                                        } else {
                                                setTokenError := c.SetOAuthBearerToken(*token)
                                                if setTokenError != nil {
-                                                       log.Warning("Cannot cannot set token: ", setTokenError)
+                                                       log.Warning(setTokenErrorMessage, setTokenError)
                                                        c.SetOAuthBearerTokenFailure(setTokenError.Error())
                                                }
                                        }
                                default:
-                                       log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
+                                       log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
                                }
 
-                       case msg := <-event_chan:
+                       case msg := <-eventChan:
                                if msg == 0 {
                                        return
                                }
@@ -126,11 +130,11 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
                for {
                        for {
                                select {
-                               case reader_ctrl := <-control_ch:
-                                       if reader_ctrl.Command == "EXIT" {
-                                               event_chan <- 0
-                                               log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                                               data_ch <- nil //Signal to job handler
+                               case readerCtrl := <-controlCh:
+                                       if readerCtrl.Command == "EXIT" {
+                                               eventChan <- 0
+                                               log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+                                               dataCh <- nil //Signal to job handler
                                                defer c.Close()
                                                return
                                        }
@@ -138,7 +142,7 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
 
                                        ev := c.Poll(1000)
                                        if ev == nil {
-                                               log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
+                                               log.Debug("Topic Reader for type: ", typeId, "  Nothing to consume on topic: ", topic)
                                                continue
                                        }
                                        switch e := ev.(type) {
@@ -148,22 +152,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
 
                                                c.Commit()
 
-                                               data_ch <- &kmsg
+                                               dataCh <- &kmsg
                                                log.Debug("Reader msg: ", &kmsg)
-                                               log.Debug("Reader - data_ch ", data_ch)
+                                               log.Debug("Reader - data_ch ", dataCh)
                                        case kafka.Error:
                                                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
 
                                        case kafka.OAuthBearerTokenRefresh:
                                                log.Debug("New consumer token needed: ", ev)
-                                               token, err := Fetch_token()
+                                               token, err := FetchToken()
                                                if err != nil {
-                                                       log.Warning("Cannot cannot fetch token: ", err)
+                                                       log.Warning(fetchTokenErrorMessage, err)
                                                        c.SetOAuthBearerTokenFailure(err.Error())
                                                } else {
                                                        setTokenError := c.SetOAuthBearerToken(*token)
                                                        if setTokenError != nil {
-                                                               log.Warning("Cannot cannot set token: ", setTokenError)
+                                                               log.Warning(setTokenErrorMessage, setTokenError)
                                                                c.SetOAuthBearerTokenFailure(setTokenError.Error())
                                                        }
                                                }
@@ -176,23 +180,24 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
        }()
 }
 
-func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
 
-       var kafka_producer *kafka.Producer
+       var kafkaProducer *kafka.Producer
 
        running := true
        log.Info("Topic writer starting")
 
        // Wait for kafka producer to become available - and be prepared to exit the writer
-       for kafka_producer == nil {
+       for kafkaProducer == nil {
                select {
-               case writer_ctl := <-control_ch:
-                       if writer_ctl.Command == "EXIT" {
+               case writerCtl := <-controlCh:
+                       if writerCtl.Command == "EXIT" {
                                //ignore cmd
                        }
                default:
-                       kafka_producer = start_producer()
-                       if kafka_producer == nil {
+                       kafkaProducer = startProducer()
+                       if kafkaProducer == nil {
                                log.Debug("Could not start kafka producer - retrying")
                                time.Sleep(1 * time.Second)
                        } else {
@@ -201,11 +206,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
                }
        }
 
-       var event_chan = make(chan int)
+       var eventChan = make(chan int)
        go func() {
                for {
                        select {
-                       case evt := <-kafka_producer.Events():
+                       case evt := <-kafkaProducer.Events():
                                switch evt.(type) {
                                case *kafka.Message:
                                        m := evt.(*kafka.Message)
@@ -219,22 +224,22 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
                                        log.Debug("Dumping topic writer event, error: ", evt)
                                case kafka.OAuthBearerTokenRefresh:
                                        log.Debug("New producer token needed: ", evt)
-                                       token, err := Fetch_token()
+                                       token, err := FetchToken()
                                        if err != nil {
-                                               log.Warning("Cannot cannot fetch token: ", err)
-                                               kafka_producer.SetOAuthBearerTokenFailure(err.Error())
+                                               log.Warning(fetchTokenErrorMessage, err)
+                                               kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
                                        } else {
-                                               setTokenError := kafka_producer.SetOAuthBearerToken(*token)
+                                               setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
                                                if setTokenError != nil {
-                                                       log.Warning("Cannot cannot set token: ", setTokenError)
-                                                       kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
+                                                       log.Warning(setTokenErrorMessage, setTokenError)
+                                                       kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
                                                }
                                        }
                                default:
                                        log.Debug("Dumping topic writer event, unknown: ", evt)
                                }
 
-                       case msg := <-event_chan:
+                       case msg := <-eventChan:
                                if msg == 0 {
                                        return
                                }
@@ -248,36 +253,36 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
        go func() {
                for {
                        select {
-                       case writer_ctl := <-control_ch:
-                               if writer_ctl.Command == "EXIT" {
+                       case writerCtl := <-controlCh:
+                               if writerCtl.Command == "EXIT" {
                                        // ignore - wait for channel signal
                                }
 
-                       case kmsg := <-data_ch:
+                       case kmsg := <-dataCh:
                                if kmsg == nil {
-                                       event_chan <- 0
+                                       eventChan <- 0
                                        log.Info("Topic writer stopped by channel signal - start_topic_writer")
-                                       defer kafka_producer.Close()
+                                       defer kafkaProducer.Close()
                                        return
                                }
 
                                retries := 10
-                               msg_ok := false
+                               msgOk := false
                                var err error
-                               for retry := 1; retry <= retries && msg_ok == false; retry++ {
-                                       err = kafka_producer.Produce(&kafka.Message{
+                               for retry := 1; retry <= retries && msgOk == false; retry++ {
+                                       err = kafkaProducer.Produce(&kafka.Message{
                                                TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
                                                Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
 
                                        if err == nil {
-                                               msg_ok = true
+                                               msgOk = true
                                                log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
                                        } else {
                                                log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
                                                time.Sleep(time.Duration(retry) * time.Second)
                                        }
                                }
-                               if !msg_ok {
+                               if !msgOk {
                                        log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
                                }
                        case <-time.After(1000 * time.Millisecond):
@@ -289,10 +294,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
        }()
 }
 
-func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
+// This function intentionally has high cognitive complexity // NOSONAR
+func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
        var cm kafka.ConfigMap
        if creds_grant_type == "" {
-               log.Info("Creating kafka plain text consumer for type: ", type_id)
+               log.Info("Creating kafka plain text consumer for type: ", typeId)
                cm = kafka.ConfigMap{
                        "bootstrap.servers":  bootstrapserver,
                        "group.id":           gid,
@@ -301,7 +307,7 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum
                        "enable.auto.commit": false,
                }
        } else {
-               log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
+               log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
                cm = kafka.ConfigMap{
                        "bootstrap.servers":  bootstrapserver,
                        "group.id":           gid,
@@ -315,16 +321,17 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum
        c, err := kafka.NewConsumer(&cm)
 
        if err != nil {
-               log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
+               log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
                return nil
        }
 
-       log.Info("Created kafka consumer for type: ", type_id, " OK")
+       log.Info("Created kafka consumer for type: ", typeId, " OK")
        return c
 }
 
 // Start kafka producer
-func start_producer() *kafka.Producer {
+// NOSONAR
+func startProducer() *kafka.Producer {
        log.Info("Creating kafka producer")
 
        var cm kafka.ConfigMap
@@ -350,18 +357,18 @@ func start_producer() *kafka.Producer {
        return p
 }
 
-func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
 
        log.Info("Type job", type_id, " started")
-       topic_list := make(map[string]string)
-       topic_list[type_id] = "json-file-ready-kp"
-       topic_list["PmData"] = "json-file-ready-kpadp"
+       topicList := make(map[string]string)
+       topicList[type_id] = "json-file-ready-kp"
+       topicList["PmData"] = "json-file-ready-kpadp"
        running := true
        for {
                select {
-               case job_ctl := <-control_ch:
-                       log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
-                       switch job_ctl.Command {
+               case jobCtl := <-control_ch:
+                       log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
+                       switch jobCtl.Command {
                        case "EXIT":
                                //ignore cmd - handled by channel signal
                        }
@@ -374,7 +381,7 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro
                                return
                        }
                        jobLimiterChan <- struct{}{}
-                       go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
+                       go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
 
                case <-time.After(1 * time.Second):
                        if !running {
@@ -384,30 +391,31 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro
        }
 }
 
-func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
+// This function intentionally has more parameters for legacy compatibility // NOSONAR
+func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
        defer func() {
                <-jobLimiterChan
        }()
        start := time.Now()
-       var evt_data dataTypes.XmlFileEventHeader
+       var evtData dataTypes.XmlFileEventHeader
 
-       err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
+       err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
        if err != nil {
-               log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
+               log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
                return
        }
-       log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
+       log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
 
        start = time.Now()
-       new_fn := miniocollector.Xml_to_json_conv(&evt_data)
+       newFn := miniocollector.XmlToJsonConv(&evtData)
        if err != nil {
-               log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
+               log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
                return
        }
-       log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
+       log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
 
        var fde dataTypes.FileDownloadedEvt
-       fde.Filename = new_fn
+       fde.Filename = newFn
        j, err := jsoniter.Marshal(fde)
 
        if err != nil {
@@ -416,19 +424,20 @@ func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression
        }
        msg.Msg.Value = j
 
-       msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
+       msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
        log.Debug("Marshal file-collect event ", time.Since(start).String())
-       log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
-       for _, v := range topic_list {
+       log.Debug("Sending file-collect event to output topic(s)", len(topicList))
+       for _, v := range topicList {
                fmt.Println("Output Topic: " + v)
                var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
                kmsg.Msg = msg.Msg
                kmsg.Topic = v
-               data_out_channel <- kmsg
+               dataOutChannel <- kmsg
        }
 }
 
-func Fetch_token() (*kafka.OAuthBearerToken, error) {
+// NOSONAR
+func FetchToken() (*kafka.OAuthBearerToken, error) {
        log.Debug("Get token inline")
        conf := &clientcredentials.Config{
                ClientID:     creds_client_id,
@@ -441,6 +450,7 @@ func Fetch_token() (*kafka.OAuthBearerToken, error) {
                return nil, err
        }
        extensions := map[string]string{}
+
        log.Debug("=====================================================")
        log.Debug("token: ", token)
        log.Debug("=====================================================")