Fixing Sonar Code Smell for PMConverter 57/11857/2
authorambrishest <ambrish.singh@est.tech>
Wed, 4 Oct 2023 15:47:32 +0000 (16:47 +0100)
committerambrishest <ambrish.singh@est.tech>
Fri, 6 Oct 2023 13:14:57 +0000 (14:14 +0100)
Issue-ID: NONRTRIC-879
Signed-off-by: ambrishest <ambrish.singh@est.tech>
Change-Id: I59b81fa264a55a7432f0b9669309169c9ca55e36
Signed-off-by: ambrishest <ambrish.singh@est.tech>
pm-file-converter/common/utils/utils.go
pm-file-converter/common/utils/utils_test.go
pm-file-converter/components/kafkacollector/kafkacollector.go
pm-file-converter/components/miniocollector/miniocollector.go
pm-file-converter/components/miniocollector/miniocollector_test.go
pm-file-converter/components/xmltransform/xmltransform.go
pm-file-converter/components/xmltransform/xmltransform_test.go
pm-file-converter/main.go

index dccf3c5..7be5819 100644 (file)
@@ -30,9 +30,7 @@ import (
 var httpclient = &http.Client{}
 
 // Send a http request with json (json may be nil)
-//
-//lint:ignore S100
-func Send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
+func SendHttpRequest(json []byte, method string, url string, retry bool, useAuth bool) bool {
 
        // set the HTTP method, url, and request body
        var req *http.Request
@@ -49,7 +47,7 @@ func Send_http_request(json []byte, method string, url string, retry bool, useAu
        }
 
        if useAuth {
-               token, err := kafkacollector.Fetch_token()
+               token, err := kafkacollector.FetchToken()
                if err != nil {
                        log.Error("Cannot fetch token for http request: ", err)
                        return false
index 2cf3f1f..166a27e 100644 (file)
@@ -20,7 +20,7 @@ func TestSend_http_request(t *testing.T) {
 
        validate := func(t *testing.T, tc *testCase) {
                t.Run(tc.Name, func(t *testing.T) {
-                       actualBool := Send_http_request(tc.Json, tc.Method, tc.Url, tc.Retry, tc.UseAuth)
+                       actualBool := SendHttpRequest(tc.Json, tc.Method, tc.Url, tc.Retry, tc.UseAuth)
 
                        assert.Equal(t, tc.ExpectedBool, actualBool)
                })
index 7451dd2..a7eaebf 100644 (file)
@@ -43,22 +43,26 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
 const parallelism_limiter = 100 //For all jobs
 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
 
-// noinspection GoCognitiveComplexity
-func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
+const typeLabel = " for type: "
+const fetchTokenErrorMessage = "Cannot fetch token: "
+const setTokenErrorMessage = "Cannot set token: "
 
-       log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
 
-       topic_ok := false
+       log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
+
+       topicOk := false
        var c *kafka.Consumer = nil
        running := true
 
-       for topic_ok == false {
+       for topicOk == false {
 
                select {
-               case reader_ctrl := <-control_ch:
-                       if reader_ctrl.Command == "EXIT" {
-                               log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                               data_ch <- nil //Signal to job handler
+               case readerCtrl := <-controlCh:
+                       if readerCtrl.Command == "EXIT" {
+                               log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+                               dataCh <- nil //Signal to job handler
                                running = false
                                return
                        }
@@ -67,27 +71,27 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
                                return
                        }
                        if c == nil {
-                               c = create_kafka_consumer(type_id, gid, cid)
+                               c = createKafkaConsumer(typeId, gid, cid)
                                if c == nil {
-                                       log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
+                                       log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
                                } else {
-                                       log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
+                                       log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
                                }
                        }
-                       if c != nil && topic_ok == false {
+                       if c != nil && topicOk == false {
                                err := c.SubscribeTopics([]string{topic}, nil)
                                if err != nil {
-                                       log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
+                                       log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying --  error details: ", err)
                                } else {
-                                       log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
-                                       topic_ok = true
+                                       log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
+                                       topicOk = true
                                }
                        }
                }
        }
-       log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
+       log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
 
-       var event_chan = make(chan int)
+       var eventChan = make(chan int)
        go func() {
                for {
                        select {
@@ -95,22 +99,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
                                switch evt.(type) {
                                case kafka.OAuthBearerTokenRefresh:
                                        log.Debug("New consumer token needed: ", evt)
-                                       token, err := Fetch_token()
+                                       token, err := FetchToken()
                                        if err != nil {
-                                               log.Warning("Cannot cannot fetch token: ", err)
+                                               log.Warning(fetchTokenErrorMessage, err)
                                                c.SetOAuthBearerTokenFailure(err.Error())
                                        } else {
                                                setTokenError := c.SetOAuthBearerToken(*token)
                                                if setTokenError != nil {
-                                                       log.Warning("Cannot cannot set token: ", setTokenError)
+                                                       log.Warning(setTokenErrorMessage, setTokenError)
                                                        c.SetOAuthBearerTokenFailure(setTokenError.Error())
                                                }
                                        }
                                default:
-                                       log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
+                                       log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
                                }
 
-                       case msg := <-event_chan:
+                       case msg := <-eventChan:
                                if msg == 0 {
                                        return
                                }
@@ -126,11 +130,11 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
                for {
                        for {
                                select {
-                               case reader_ctrl := <-control_ch:
-                                       if reader_ctrl.Command == "EXIT" {
-                                               event_chan <- 0
-                                               log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                                               data_ch <- nil //Signal to job handler
+                               case readerCtrl := <-controlCh:
+                                       if readerCtrl.Command == "EXIT" {
+                                               eventChan <- 0
+                                               log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+                                               dataCh <- nil //Signal to job handler
                                                defer c.Close()
                                                return
                                        }
@@ -138,7 +142,7 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
 
                                        ev := c.Poll(1000)
                                        if ev == nil {
-                                               log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
+                                               log.Debug("Topic Reader for type: ", typeId, "  Nothing to consume on topic: ", topic)
                                                continue
                                        }
                                        switch e := ev.(type) {
@@ -148,22 +152,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
 
                                                c.Commit()
 
-                                               data_ch <- &kmsg
+                                               dataCh <- &kmsg
                                                log.Debug("Reader msg: ", &kmsg)
-                                               log.Debug("Reader - data_ch ", data_ch)
+                                               log.Debug("Reader - data_ch ", dataCh)
                                        case kafka.Error:
                                                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
 
                                        case kafka.OAuthBearerTokenRefresh:
                                                log.Debug("New consumer token needed: ", ev)
-                                               token, err := Fetch_token()
+                                               token, err := FetchToken()
                                                if err != nil {
-                                                       log.Warning("Cannot cannot fetch token: ", err)
+                                                       log.Warning(fetchTokenErrorMessage, err)
                                                        c.SetOAuthBearerTokenFailure(err.Error())
                                                } else {
                                                        setTokenError := c.SetOAuthBearerToken(*token)
                                                        if setTokenError != nil {
-                                                               log.Warning("Cannot cannot set token: ", setTokenError)
+                                                               log.Warning(setTokenErrorMessage, setTokenError)
                                                                c.SetOAuthBearerTokenFailure(setTokenError.Error())
                                                        }
                                                }
@@ -176,23 +180,24 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.
        }()
 }
 
-func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
 
-       var kafka_producer *kafka.Producer
+       var kafkaProducer *kafka.Producer
 
        running := true
        log.Info("Topic writer starting")
 
        // Wait for kafka producer to become available - and be prepared to exit the writer
-       for kafka_producer == nil {
+       for kafkaProducer == nil {
                select {
-               case writer_ctl := <-control_ch:
-                       if writer_ctl.Command == "EXIT" {
+               case writerCtl := <-controlCh:
+                       if writerCtl.Command == "EXIT" {
                                //ignore cmd
                        }
                default:
-                       kafka_producer = start_producer()
-                       if kafka_producer == nil {
+                       kafkaProducer = startProducer()
+                       if kafkaProducer == nil {
                                log.Debug("Could not start kafka producer - retrying")
                                time.Sleep(1 * time.Second)
                        } else {
@@ -201,11 +206,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
                }
        }
 
-       var event_chan = make(chan int)
+       var eventChan = make(chan int)
        go func() {
                for {
                        select {
-                       case evt := <-kafka_producer.Events():
+                       case evt := <-kafkaProducer.Events():
                                switch evt.(type) {
                                case *kafka.Message:
                                        m := evt.(*kafka.Message)
@@ -219,22 +224,22 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
                                        log.Debug("Dumping topic writer event, error: ", evt)
                                case kafka.OAuthBearerTokenRefresh:
                                        log.Debug("New producer token needed: ", evt)
-                                       token, err := Fetch_token()
+                                       token, err := FetchToken()
                                        if err != nil {
-                                               log.Warning("Cannot cannot fetch token: ", err)
-                                               kafka_producer.SetOAuthBearerTokenFailure(err.Error())
+                                               log.Warning(fetchTokenErrorMessage, err)
+                                               kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
                                        } else {
-                                               setTokenError := kafka_producer.SetOAuthBearerToken(*token)
+                                               setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
                                                if setTokenError != nil {
-                                                       log.Warning("Cannot cannot set token: ", setTokenError)
-                                                       kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
+                                                       log.Warning(setTokenErrorMessage, setTokenError)
+                                                       kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
                                                }
                                        }
                                default:
                                        log.Debug("Dumping topic writer event, unknown: ", evt)
                                }
 
-                       case msg := <-event_chan:
+                       case msg := <-eventChan:
                                if msg == 0 {
                                        return
                                }
@@ -248,36 +253,36 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
        go func() {
                for {
                        select {
-                       case writer_ctl := <-control_ch:
-                               if writer_ctl.Command == "EXIT" {
+                       case writerCtl := <-controlCh:
+                               if writerCtl.Command == "EXIT" {
                                        // ignore - wait for channel signal
                                }
 
-                       case kmsg := <-data_ch:
+                       case kmsg := <-dataCh:
                                if kmsg == nil {
-                                       event_chan <- 0
+                                       eventChan <- 0
                                        log.Info("Topic writer stopped by channel signal - start_topic_writer")
-                                       defer kafka_producer.Close()
+                                       defer kafkaProducer.Close()
                                        return
                                }
 
                                retries := 10
-                               msg_ok := false
+                               msgOk := false
                                var err error
-                               for retry := 1; retry <= retries && msg_ok == false; retry++ {
-                                       err = kafka_producer.Produce(&kafka.Message{
+                               for retry := 1; retry <= retries && msgOk == false; retry++ {
+                                       err = kafkaProducer.Produce(&kafka.Message{
                                                TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
                                                Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
 
                                        if err == nil {
-                                               msg_ok = true
+                                               msgOk = true
                                                log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
                                        } else {
                                                log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
                                                time.Sleep(time.Duration(retry) * time.Second)
                                        }
                                }
-                               if !msg_ok {
+                               if !msgOk {
                                        log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
                                }
                        case <-time.After(1000 * time.Millisecond):
@@ -289,10 +294,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d
        }()
 }
 
-func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
+// This function intentionally has high cognitive complexity // NOSONAR
+func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
        var cm kafka.ConfigMap
        if creds_grant_type == "" {
-               log.Info("Creating kafka plain text consumer for type: ", type_id)
+               log.Info("Creating kafka plain text consumer for type: ", typeId)
                cm = kafka.ConfigMap{
                        "bootstrap.servers":  bootstrapserver,
                        "group.id":           gid,
@@ -301,7 +307,7 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum
                        "enable.auto.commit": false,
                }
        } else {
-               log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
+               log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
                cm = kafka.ConfigMap{
                        "bootstrap.servers":  bootstrapserver,
                        "group.id":           gid,
@@ -315,16 +321,17 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum
        c, err := kafka.NewConsumer(&cm)
 
        if err != nil {
-               log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
+               log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
                return nil
        }
 
-       log.Info("Created kafka consumer for type: ", type_id, " OK")
+       log.Info("Created kafka consumer for type: ", typeId, " OK")
        return c
 }
 
 // Start kafka producer
-func start_producer() *kafka.Producer {
+// NOSONAR
+func startProducer() *kafka.Producer {
        log.Info("Creating kafka producer")
 
        var cm kafka.ConfigMap
@@ -350,18 +357,18 @@ func start_producer() *kafka.Producer {
        return p
 }
 
-func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
 
        log.Info("Type job", type_id, " started")
-       topic_list := make(map[string]string)
-       topic_list[type_id] = "json-file-ready-kp"
-       topic_list["PmData"] = "json-file-ready-kpadp"
+       topicList := make(map[string]string)
+       topicList[type_id] = "json-file-ready-kp"
+       topicList["PmData"] = "json-file-ready-kpadp"
        running := true
        for {
                select {
-               case job_ctl := <-control_ch:
-                       log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
-                       switch job_ctl.Command {
+               case jobCtl := <-control_ch:
+                       log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
+                       switch jobCtl.Command {
                        case "EXIT":
                                //ignore cmd - handled by channel signal
                        }
@@ -374,7 +381,7 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro
                                return
                        }
                        jobLimiterChan <- struct{}{}
-                       go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
+                       go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
 
                case <-time.After(1 * time.Second):
                        if !running {
@@ -384,30 +391,31 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro
        }
 }
 
-func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
+// This function intentionally has more parameters for legacy compatibility // NOSONAR
+func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
        defer func() {
                <-jobLimiterChan
        }()
        start := time.Now()
-       var evt_data dataTypes.XmlFileEventHeader
+       var evtData dataTypes.XmlFileEventHeader
 
-       err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
+       err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
        if err != nil {
-               log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
+               log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
                return
        }
-       log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
+       log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
 
        start = time.Now()
-       new_fn := miniocollector.Xml_to_json_conv(&evt_data)
+       newFn := miniocollector.XmlToJsonConv(&evtData)
        if err != nil {
-               log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
+               log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
                return
        }
-       log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
+       log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
 
        var fde dataTypes.FileDownloadedEvt
-       fde.Filename = new_fn
+       fde.Filename = newFn
        j, err := jsoniter.Marshal(fde)
 
        if err != nil {
@@ -416,19 +424,20 @@ func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression
        }
        msg.Msg.Value = j
 
-       msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
+       msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
        log.Debug("Marshal file-collect event ", time.Since(start).String())
-       log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
-       for _, v := range topic_list {
+       log.Debug("Sending file-collect event to output topic(s)", len(topicList))
+       for _, v := range topicList {
                fmt.Println("Output Topic: " + v)
                var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
                kmsg.Msg = msg.Msg
                kmsg.Topic = v
-               data_out_channel <- kmsg
+               dataOutChannel <- kmsg
        }
 }
 
-func Fetch_token() (*kafka.OAuthBearerToken, error) {
+// NOSONAR
+func FetchToken() (*kafka.OAuthBearerToken, error) {
        log.Debug("Get token inline")
        conf := &clientcredentials.Config{
                ClientID:     creds_client_id,
@@ -441,6 +450,7 @@ func Fetch_token() (*kafka.OAuthBearerToken, error) {
                return nil, err
        }
        extensions := map[string]string{}
+
        log.Debug("=====================================================")
        log.Debug("token: ", token)
        log.Debug("=====================================================")
index ac8ce05..24ff41f 100644 (file)
@@ -39,7 +39,7 @@ import (
 )
 
 // nolint
-func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
+func XmlToJsonConv(evtData *dataTypes.XmlFileEventHeader) string {
        filestoreUser := os.Getenv("FILESTORE_USER")
        filestorePwd := os.Getenv("FILESTORE_PWD")
        filestoreServer := os.Getenv("FILESTORE_SERVER")
@@ -52,12 +52,12 @@ func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
                log.Fatalln(err)
        }
        expiry := time.Second * 24 * 60 * 60 // 1 day.
-       objectName := evt_data.Name
-       bucketName := evt_data.ObjectStoreBucket
-       compresion := evt_data.Compression
+       objectName := evtData.Name
+       bucketName := evtData.ObjectStoreBucket
+       compresion := evtData.Compression
        reqParams := make(url.Values)
 
-       xmlh, err := jsoniter.Marshal(evt_data)
+       xmlh, err := jsoniter.Marshal(evtData)
        if err != nil {
                fmt.Printf("Error: %s", err)
                return ""
@@ -68,18 +68,18 @@ func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
        if err != nil {
                log.Fatalln(err)
        }
-       file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
+       fileBytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
        newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
        var buf bytes.Buffer
-       err = gzipWrite(&buf, &file_bytes)
-       upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
+       err = gzipWrite(&buf, &fileBytes)
+       uploadObject(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
        fmt.Println("")
 
        return newObjectName
 }
 
 // nolint
-func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
+func uploadObject(mc *minio.Client, b []byte, objectName string, fsbucket string) {
        contentType := "application/json"
        if strings.HasSuffix(objectName, ".gz") {
                contentType = "application/gzip"
@@ -88,8 +88,8 @@ func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket strin
        // Upload the xml file with PutObject
        r := bytes.NewReader(b)
        tctx := context.Background()
-       if check_minio_bucket(mc, fsbucket) == false {
-               err := create_minio_bucket(mc, fsbucket)
+       if checkMinioBucket(mc, fsbucket) == false {
+               err := createMinioBucket(mc, fsbucket)
                if err != nil {
                        log.Error("Cannot create bucket: ", fsbucket, ", ", err)
                        return
@@ -113,7 +113,7 @@ func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket strin
 }
 
 // nolint
-func create_minio_bucket(mc *minio.Client, bucket string) error {
+func createMinioBucket(mc *minio.Client, bucket string) error {
        tctx := context.Background()
        err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
        if err != nil {
@@ -132,7 +132,7 @@ func create_minio_bucket(mc *minio.Client, bucket string) error {
 }
 
 // nolint
-func check_minio_bucket(mc *minio.Client, bucket string) bool {
+func checkMinioBucket(mc *minio.Client, bucket string) bool {
        tctx := context.Background()
        exists, err := mc.BucketExists(tctx, bucket)
        if err == nil && exists {
index 17327a8..121799d 100644 (file)
@@ -33,7 +33,7 @@ func TestMake_minio_bucket(t *testing.T) {
 
        // Create a test bucket.
        bucketName := "my-test-bucket"
-       err = create_minio_bucket(minioClient, bucketName)
+       err = createMinioBucket(minioClient, bucketName)
        if err != nil {
                log.Fatalf("Error creating bucket: %v", err)
        } else {
@@ -61,7 +61,7 @@ func Test_bucket_cannot_empty(t *testing.T) {
 
        // Create a test bucket.
        bucketName := ""
-       err = create_minio_bucket(minioClient, bucketName)
+       err = createMinioBucket(minioClient, bucketName)
        if err != nil {
                assert.Error(t, err)
        } else {
@@ -91,7 +91,7 @@ func Test_check_minio_bucket(t *testing.T) {
 
        // Create a test bucket.
        bucketName := "my-test-bucket"
-       found = check_minio_bucket(minioClient, bucketName)
+       found = checkMinioBucket(minioClient, bucketName)
        if found {
                assert.True(t, found)
        } else {
@@ -121,7 +121,7 @@ func Test_bucket_not_exists(t *testing.T) {
 
        // Create a test bucket.
        bucketName := "my-test-bucket-not-exists"
-       found = check_minio_bucket(minioClient, bucketName)
+       found = checkMinioBucket(minioClient, bucketName)
        if found {
                assert.True(t, found)
        } else {
@@ -168,7 +168,7 @@ func Test_upload_object(t *testing.T) {
 
        // Create a test bucket.
        bucketName := "my-test-bucket"
-       upload_object(minioClient, file_bytes, "minio_upload_test.json", bucketName)
+       uploadObject(minioClient, file_bytes, "minio_upload_test.json", bucketName)
 
        assert.NoError(t, err)
 
index e7e04e4..6e70022 100644 (file)
@@ -36,10 +36,10 @@ import (
 )
 
 //lint:ignore S117
-func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) {
+func xmlToJsonConv(fBytevalue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) {
        var f dataTypes.MeasCollecFile
        start := time.Now()
-       err := xml.Unmarshal(*f_byteValue, &f)
+       err := xml.Unmarshal(*fBytevalue, &f)
        if err != nil {
                return nil, errors.New("Cannot unmarshal xml-file")
        }
@@ -48,7 +48,6 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) (
        start = time.Now()
        var pmfile dataTypes.PMJsonFile
 
-       //TODO: Fill in more values
        pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
        pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
        pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
@@ -82,7 +81,6 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) (
 
        pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
 
-       //TODO: Fill more values
        pmfile.Event.CommonEventHeader.Domain = ""    //xfeh.Domain
        pmfile.Event.CommonEventHeader.EventID = ""   //xfeh.EventID
        pmfile.Event.CommonEventHeader.Sequence = 0   //xfeh.Sequence
@@ -109,8 +107,8 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) (
 }
 
 func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte {
-       evt_data := dataTypes.XmlFileEventHeader{}
-       jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evt_data)
+       evtData := dataTypes.XmlFileEventHeader{}
+       jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evtData)
 
        client := new(http.Client)
 
@@ -136,8 +134,8 @@ func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte {
                log.Error("Error reading response, discarding message, ", err)
                return nil
        }
-       file_bytes := buf3.Bytes()
+       fileBytes := buf3.Bytes()
        fmt.Println("Converting to XML")
-       b, err := xml_to_json_conv(&file_bytes, &evt_data)
+       b, err := xmlToJsonConv(&fileBytes, &evtData)
        return b
 }
index 9f12c99..5f6e89d 100644 (file)
@@ -63,7 +63,7 @@ func TestXMLToJSONConv_Success(t *testing.T) {
                return
        }
        file_bytes := buf3.Bytes()
-       b, err := xml_to_json_conv(&file_bytes, &evt_data)
+       b, err := xmlToJsonConv(&file_bytes, &evt_data)
 
        json_filename := "A20230515.0700_0100-0715_0100_GNODEB-0.json"
 
index b931a2a..599a33b 100644 (file)
@@ -72,6 +72,8 @@ const (
        Terminating
 )
 
+const registeringProducer = "Registering producer: "
+
 // == Main ==//
 func main() {
 
@@ -93,11 +95,11 @@ func main() {
                producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
        }
 
-       go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
+       go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
 
        //Setup proc for periodic type registration
-       var event_chan = make(chan int) //Channel for stopping the proc
-       go periodic_registration(event_chan)
+       var eventChan = make(chan int) //Channel for stopping the proc
+       go periodicRegistration(eventChan)
 
        //Wait for term/int signal do try to shut down gracefully
        sigs := make(chan os.Signal, 1)
@@ -105,7 +107,7 @@ func main() {
        go func() {
                sig := <-sigs
                fmt.Printf("Received signal %s - application will terminate\n", sig)
-               event_chan <- 0 // Stop periodic registration
+               eventChan <- 0 // Stop periodic registration
                datalock.Lock()
                defer datalock.Unlock()
                AppState = Terminating
@@ -122,7 +124,7 @@ func main() {
 
 // == Core functions ==//
 // Run periodic registration of producers
-func periodic_registration(evtch chan int) {
+func periodicRegistration(evtch chan int) {
        var delay int = 1
        for {
                select {
@@ -131,7 +133,7 @@ func periodic_registration(evtch chan int) {
                                return
                        }
                case <-time.After(time.Duration(delay) * time.Second):
-                       ok := register_producer()
+                       ok := registerProducer()
                        if ok {
                                delay = registration_delay_long
                        } else {
@@ -145,24 +147,26 @@ func periodic_registration(evtch chan int) {
        }
 }
 
-func register_producer() bool {
+func registerProducer() bool {
 
-       log.Info("Registering producer: ", producer_instance_name)
+       log.Info(registeringProducer, producer_instance_name)
 
        file, err := os.ReadFile(config_file)
        if err != nil {
                log.Error("Cannot read config file: ", config_file)
-               log.Error("Registering producer: ", producer_instance_name, " - failed")
+               // NOSONAR
+               log.Error(registeringProducer, producer_instance_name, " - failed")
                return false
        }
        data := dataTypes.DataTypes{}
        err = jsoniter.Unmarshal([]byte(file), &data)
        if err != nil {
                log.Error("Cannot parse config file: ", config_file)
-               log.Error("Registering producer: ", producer_instance_name, " - failed")
+               // NOSONAR
+               log.Error(registeringProducer, producer_instance_name, " - failed")
                return false
        }
-       var new_type_names []string
+       var newTypeNames []string
 
        for i := 0; i < len(data.ProdDataTypes); i++ {
                t1 := make(map[string]interface{})
@@ -178,57 +182,59 @@ func register_producer() bool {
                json, err := jsoniter.Marshal(t1)
                if err != nil {
                        log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
-                       log.Error("Registering producer: ", producer_instance_name, " - failed")
+                       // NOSONAR
+                       log.Error(registeringProducer, producer_instance_name, " - failed")
                        return false
                } else {
-                       ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
+                       ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
                        if !ok {
                                log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
-                               log.Error("Registering producer: ", producer_instance_name, " - failed")
+                               // NOSONAR
+                               log.Error(registeringProducer, producer_instance_name, " - failed")
                                return false
                        }
-                       new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
+                       newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
                }
 
        }
 
-       log.Debug("Registering types: ", new_type_names)
+       log.Debug("Registering types: ", newTypeNames)
        datalock.Lock()
        defer datalock.Unlock()
 
        for _, v := range data.ProdDataTypes {
                log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
-               start_type_job(v)
+               startTypeJob(v)
        }
 
        dataTypes.InfoTypes = data
        log.Debug("Datatypes: ", dataTypes.InfoTypes)
-       log.Info("Registering producer: ", producer_instance_name, " - OK")
+       log.Info(registeringProducer, producer_instance_name, " - OK")
        return true
 }
 
-func start_type_job(dp dataTypes.DataType) {
+func startTypeJob(dp dataTypes.DataType) {
        log.Info("Starting type job: ", dp.ID)
-       job_record := dataTypes.TypeJobRecord{}
+       jobRecord := dataTypes.TypeJobRecord{}
 
-       job_record.Job_control = make(chan dataTypes.JobControl, 1)
-       job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
-       job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
-       job_record.InfoType = dp.ID
-       job_record.InputTopic = dp.KafkaInputTopic
-       job_record.GroupId = "kafka-procon-" + dp.ID
-       job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
+       jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
+       jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
+       jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
+       jobRecord.InfoType = dp.ID
+       jobRecord.InputTopic = dp.KafkaInputTopic
+       jobRecord.GroupId = "kafka-procon-" + dp.ID
+       jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
 
        switch dp.ID {
        case "xml-file-data-to-filestore":
-               go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
+               go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
        case "xml-file-data":
-               go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
+               go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
        default:
        }
 
-       go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
+       go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
 
-       dataTypes.TypeJobs[dp.ID] = job_record
+       dataTypes.TypeJobs[dp.ID] = jobRecord
        log.Debug("Type job input type: ", dp.InputJobType)
 }