From 6e3dc81c76fc88e000b83c35282d3db823f815a3 Mon Sep 17 00:00:00 2001 From: ambrishest Date: Wed, 4 Oct 2023 16:47:32 +0100 Subject: [PATCH] Fixing Sonar Code Smell for PMConverter Issue-ID: NONRTRIC-879 Signed-off-by: ambrishest Change-Id: I59b81fa264a55a7432f0b9669309169c9ca55e36 Signed-off-by: ambrishest --- pm-file-converter/common/utils/utils.go | 6 +- pm-file-converter/common/utils/utils_test.go | 2 +- .../components/kafkacollector/kafkacollector.go | 186 +++++++++++---------- .../components/miniocollector/miniocollector.go | 26 +-- .../miniocollector/miniocollector_test.go | 10 +- .../components/xmltransform/xmltransform.go | 14 +- .../components/xmltransform/xmltransform_test.go | 2 +- pm-file-converter/main.go | 68 ++++---- 8 files changed, 163 insertions(+), 151 deletions(-) diff --git a/pm-file-converter/common/utils/utils.go b/pm-file-converter/common/utils/utils.go index dccf3c5..7be5819 100644 --- a/pm-file-converter/common/utils/utils.go +++ b/pm-file-converter/common/utils/utils.go @@ -30,9 +30,7 @@ import ( var httpclient = &http.Client{} // Send a http request with json (json may be nil) -// -//lint:ignore S100 -func Send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { +func SendHttpRequest(json []byte, method string, url string, retry bool, useAuth bool) bool { // set the HTTP method, url, and request body var req *http.Request @@ -49,7 +47,7 @@ func Send_http_request(json []byte, method string, url string, retry bool, useAu } if useAuth { - token, err := kafkacollector.Fetch_token() + token, err := kafkacollector.FetchToken() if err != nil { log.Error("Cannot fetch token for http request: ", err) return false diff --git a/pm-file-converter/common/utils/utils_test.go b/pm-file-converter/common/utils/utils_test.go index 2cf3f1f..166a27e 100644 --- a/pm-file-converter/common/utils/utils_test.go +++ b/pm-file-converter/common/utils/utils_test.go @@ -20,7 +20,7 @@ func TestSend_http_request(t *testing.T) { validate := func(t *testing.T, tc *testCase) { t.Run(tc.Name, func(t *testing.T) { - actualBool := Send_http_request(tc.Json, tc.Method, tc.Url, tc.Retry, tc.UseAuth) + actualBool := SendHttpRequest(tc.Json, tc.Method, tc.Url, tc.Retry, tc.UseAuth) assert.Equal(t, tc.ExpectedBool, actualBool) }) diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go index 7451dd2..a7eaebf 100644 --- a/pm-file-converter/components/kafkacollector/kafkacollector.go +++ b/pm-file-converter/components/kafkacollector/kafkacollector.go @@ -43,22 +43,26 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL") const parallelism_limiter = 100 //For all jobs var jobLimiterChan = make(chan struct{}, parallelism_limiter) -// noinspection GoCognitiveComplexity -func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) { +const typeLabel = " for type: " +const fetchTokenErrorMessage = "Cannot fetch token: " +const setTokenErrorMessage = "Cannot set token: " - log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id) +// This function intentionally has high cognitive complexity // NOSONAR +func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) { - topic_ok := false + log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId) + + topicOk := false var c *kafka.Consumer = nil running := true - for topic_ok == false { + for topicOk == false { select { - case reader_ctrl := <-control_ch: - if reader_ctrl.Command == "EXIT" { - log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") - data_ch <- nil //Signal to job handler + case readerCtrl := <-controlCh: + if readerCtrl.Command == "EXIT" { + log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped") + dataCh <- nil //Signal to job handler running = false return } @@ -67,27 +71,27 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. return } if c == nil { - c = create_kafka_consumer(type_id, gid, cid) + c = createKafkaConsumer(typeId, gid, cid) if c == nil { - log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying") + log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying") } else { - log.Info("Consumer started on topic: ", topic, " for type: ", type_id) + log.Info("Consumer started on topic: ", topic, typeLabel, typeId) } } - if c != nil && topic_ok == false { + if c != nil && topicOk == false { err := c.SubscribeTopics([]string{topic}, nil) if err != nil { - log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err) + log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying -- error details: ", err) } else { - log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id) - topic_ok = true + log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId) + topicOk = true } } } } - log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id) + log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId) - var event_chan = make(chan int) + var eventChan = make(chan int) go func() { for { select { @@ -95,22 +99,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. switch evt.(type) { case kafka.OAuthBearerTokenRefresh: log.Debug("New consumer token needed: ", evt) - token, err := Fetch_token() + token, err := FetchToken() if err != nil { - log.Warning("Cannot cannot fetch token: ", err) + log.Warning(fetchTokenErrorMessage, err) c.SetOAuthBearerTokenFailure(err.Error()) } else { setTokenError := c.SetOAuthBearerToken(*token) if setTokenError != nil { - log.Warning("Cannot cannot set token: ", setTokenError) + log.Warning(setTokenErrorMessage, setTokenError) c.SetOAuthBearerTokenFailure(setTokenError.Error()) } } default: - log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String()) + log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String()) } - case msg := <-event_chan: + case msg := <-eventChan: if msg == 0 { return } @@ -126,11 +130,11 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. for { for { select { - case reader_ctrl := <-control_ch: - if reader_ctrl.Command == "EXIT" { - event_chan <- 0 - log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") - data_ch <- nil //Signal to job handler + case readerCtrl := <-controlCh: + if readerCtrl.Command == "EXIT" { + eventChan <- 0 + log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped") + dataCh <- nil //Signal to job handler defer c.Close() return } @@ -138,7 +142,7 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. ev := c.Poll(1000) if ev == nil { - log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic) + log.Debug("Topic Reader for type: ", typeId, " Nothing to consume on topic: ", topic) continue } switch e := ev.(type) { @@ -148,22 +152,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. c.Commit() - data_ch <- &kmsg + dataCh <- &kmsg log.Debug("Reader msg: ", &kmsg) - log.Debug("Reader - data_ch ", data_ch) + log.Debug("Reader - data_ch ", dataCh) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) case kafka.OAuthBearerTokenRefresh: log.Debug("New consumer token needed: ", ev) - token, err := Fetch_token() + token, err := FetchToken() if err != nil { - log.Warning("Cannot cannot fetch token: ", err) + log.Warning(fetchTokenErrorMessage, err) c.SetOAuthBearerTokenFailure(err.Error()) } else { setTokenError := c.SetOAuthBearerToken(*token) if setTokenError != nil { - log.Warning("Cannot cannot set token: ", setTokenError) + log.Warning(setTokenErrorMessage, setTokenError) c.SetOAuthBearerTokenFailure(setTokenError.Error()) } } @@ -176,23 +180,24 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. }() } -func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) { +// This function intentionally has high cognitive complexity // NOSONAR +func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) { - var kafka_producer *kafka.Producer + var kafkaProducer *kafka.Producer running := true log.Info("Topic writer starting") // Wait for kafka producer to become available - and be prepared to exit the writer - for kafka_producer == nil { + for kafkaProducer == nil { select { - case writer_ctl := <-control_ch: - if writer_ctl.Command == "EXIT" { + case writerCtl := <-controlCh: + if writerCtl.Command == "EXIT" { //ignore cmd } default: - kafka_producer = start_producer() - if kafka_producer == nil { + kafkaProducer = startProducer() + if kafkaProducer == nil { log.Debug("Could not start kafka producer - retrying") time.Sleep(1 * time.Second) } else { @@ -201,11 +206,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d } } - var event_chan = make(chan int) + var eventChan = make(chan int) go func() { for { select { - case evt := <-kafka_producer.Events(): + case evt := <-kafkaProducer.Events(): switch evt.(type) { case *kafka.Message: m := evt.(*kafka.Message) @@ -219,22 +224,22 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d log.Debug("Dumping topic writer event, error: ", evt) case kafka.OAuthBearerTokenRefresh: log.Debug("New producer token needed: ", evt) - token, err := Fetch_token() + token, err := FetchToken() if err != nil { - log.Warning("Cannot cannot fetch token: ", err) - kafka_producer.SetOAuthBearerTokenFailure(err.Error()) + log.Warning(fetchTokenErrorMessage, err) + kafkaProducer.SetOAuthBearerTokenFailure(err.Error()) } else { - setTokenError := kafka_producer.SetOAuthBearerToken(*token) + setTokenError := kafkaProducer.SetOAuthBearerToken(*token) if setTokenError != nil { - log.Warning("Cannot cannot set token: ", setTokenError) - kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error()) + log.Warning(setTokenErrorMessage, setTokenError) + kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error()) } } default: log.Debug("Dumping topic writer event, unknown: ", evt) } - case msg := <-event_chan: + case msg := <-eventChan: if msg == 0 { return } @@ -248,36 +253,36 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d go func() { for { select { - case writer_ctl := <-control_ch: - if writer_ctl.Command == "EXIT" { + case writerCtl := <-controlCh: + if writerCtl.Command == "EXIT" { // ignore - wait for channel signal } - case kmsg := <-data_ch: + case kmsg := <-dataCh: if kmsg == nil { - event_chan <- 0 + eventChan <- 0 log.Info("Topic writer stopped by channel signal - start_topic_writer") - defer kafka_producer.Close() + defer kafkaProducer.Close() return } retries := 10 - msg_ok := false + msgOk := false var err error - for retry := 1; retry <= retries && msg_ok == false; retry++ { - err = kafka_producer.Produce(&kafka.Message{ + for retry := 1; retry <= retries && msgOk == false; retry++ { + err = kafkaProducer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny}, Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil) if err == nil { - msg_ok = true + msgOk = true log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic) } else { log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err) time.Sleep(time.Duration(retry) * time.Second) } } - if !msg_ok { + if !msgOk { log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err) } case <-time.After(1000 * time.Millisecond): @@ -289,10 +294,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d }() } -func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer { +// This function intentionally has high cognitive complexity // NOSONAR +func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer { var cm kafka.ConfigMap if creds_grant_type == "" { - log.Info("Creating kafka plain text consumer for type: ", type_id) + log.Info("Creating kafka plain text consumer for type: ", typeId) cm = kafka.ConfigMap{ "bootstrap.servers": bootstrapserver, "group.id": gid, @@ -301,7 +307,7 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum "enable.auto.commit": false, } } else { - log.Info("Creating kafka SASL plain text consumer for type: ", type_id) + log.Info("Creating kafka SASL plain text consumer for type: ", typeId) cm = kafka.ConfigMap{ "bootstrap.servers": bootstrapserver, "group.id": gid, @@ -315,16 +321,17 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum c, err := kafka.NewConsumer(&cm) if err != nil { - log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err) + log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err) return nil } - log.Info("Created kafka consumer for type: ", type_id, " OK") + log.Info("Created kafka consumer for type: ", typeId, " OK") return c } // Start kafka producer -func start_producer() *kafka.Producer { +// NOSONAR +func startProducer() *kafka.Producer { log.Info("Creating kafka producer") var cm kafka.ConfigMap @@ -350,18 +357,18 @@ func start_producer() *kafka.Producer { return p } -func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) { +func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) { log.Info("Type job", type_id, " started") - topic_list := make(map[string]string) - topic_list[type_id] = "json-file-ready-kp" - topic_list["PmData"] = "json-file-ready-kpadp" + topicList := make(map[string]string) + topicList[type_id] = "json-file-ready-kp" + topicList["PmData"] = "json-file-ready-kpadp" running := true for { select { - case job_ctl := <-control_ch: - log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command) - switch job_ctl.Command { + case jobCtl := <-control_ch: + log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command) + switch jobCtl.Command { case "EXIT": //ignore cmd - handled by channel signal } @@ -374,7 +381,7 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro return } jobLimiterChan <- struct{}{} - go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket) + go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket) case <-time.After(1 * time.Second): if !running { @@ -384,30 +391,31 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro } } -func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) { +// This function intentionally has more parameters for legacy compatibility // NOSONAR +func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) { defer func() { <-jobLimiterChan }() start := time.Now() - var evt_data dataTypes.XmlFileEventHeader + var evtData dataTypes.XmlFileEventHeader - err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data) + err := jsoniter.Unmarshal(msg.Msg.Value, &evtData) if err != nil { - log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err) + log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err) return } - log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String()) + log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String()) start = time.Now() - new_fn := miniocollector.Xml_to_json_conv(&evt_data) + newFn := miniocollector.XmlToJsonConv(&evtData) if err != nil { - log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err) + log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err) return } - log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String()) + log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String()) var fde dataTypes.FileDownloadedEvt - fde.Filename = new_fn + fde.Filename = newFn j, err := jsoniter.Marshal(fde) if err != nil { @@ -416,19 +424,20 @@ func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression } msg.Msg.Value = j - msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"") + msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"") log.Debug("Marshal file-collect event ", time.Since(start).String()) - log.Debug("Sending file-collect event to output topic(s)", len(topic_list)) - for _, v := range topic_list { + log.Debug("Sending file-collect event to output topic(s)", len(topicList)) + for _, v := range topicList { fmt.Println("Output Topic: " + v) var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload) kmsg.Msg = msg.Msg kmsg.Topic = v - data_out_channel <- kmsg + dataOutChannel <- kmsg } } -func Fetch_token() (*kafka.OAuthBearerToken, error) { +// NOSONAR +func FetchToken() (*kafka.OAuthBearerToken, error) { log.Debug("Get token inline") conf := &clientcredentials.Config{ ClientID: creds_client_id, @@ -441,6 +450,7 @@ func Fetch_token() (*kafka.OAuthBearerToken, error) { return nil, err } extensions := map[string]string{} + log.Debug("=====================================================") log.Debug("token: ", token) log.Debug("=====================================================") diff --git a/pm-file-converter/components/miniocollector/miniocollector.go b/pm-file-converter/components/miniocollector/miniocollector.go index ac8ce05..24ff41f 100644 --- a/pm-file-converter/components/miniocollector/miniocollector.go +++ b/pm-file-converter/components/miniocollector/miniocollector.go @@ -39,7 +39,7 @@ import ( ) // nolint -func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string { +func XmlToJsonConv(evtData *dataTypes.XmlFileEventHeader) string { filestoreUser := os.Getenv("FILESTORE_USER") filestorePwd := os.Getenv("FILESTORE_PWD") filestoreServer := os.Getenv("FILESTORE_SERVER") @@ -52,12 +52,12 @@ func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string { log.Fatalln(err) } expiry := time.Second * 24 * 60 * 60 // 1 day. - objectName := evt_data.Name - bucketName := evt_data.ObjectStoreBucket - compresion := evt_data.Compression + objectName := evtData.Name + bucketName := evtData.ObjectStoreBucket + compresion := evtData.Compression reqParams := make(url.Values) - xmlh, err := jsoniter.Marshal(evt_data) + xmlh, err := jsoniter.Marshal(evtData) if err != nil { fmt.Printf("Error: %s", err) return "" @@ -68,18 +68,18 @@ func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string { if err != nil { log.Fatalln(err) } - file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh)) + fileBytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh)) newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz" var buf bytes.Buffer - err = gzipWrite(&buf, &file_bytes) - upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json") + err = gzipWrite(&buf, &fileBytes) + uploadObject(s3Client, buf.Bytes(), newObjectName, "pm-files-json") fmt.Println("") return newObjectName } // nolint -func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) { +func uploadObject(mc *minio.Client, b []byte, objectName string, fsbucket string) { contentType := "application/json" if strings.HasSuffix(objectName, ".gz") { contentType = "application/gzip" @@ -88,8 +88,8 @@ func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket strin // Upload the xml file with PutObject r := bytes.NewReader(b) tctx := context.Background() - if check_minio_bucket(mc, fsbucket) == false { - err := create_minio_bucket(mc, fsbucket) + if checkMinioBucket(mc, fsbucket) == false { + err := createMinioBucket(mc, fsbucket) if err != nil { log.Error("Cannot create bucket: ", fsbucket, ", ", err) return @@ -113,7 +113,7 @@ func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket strin } // nolint -func create_minio_bucket(mc *minio.Client, bucket string) error { +func createMinioBucket(mc *minio.Client, bucket string) error { tctx := context.Background() err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{}) if err != nil { @@ -132,7 +132,7 @@ func create_minio_bucket(mc *minio.Client, bucket string) error { } // nolint -func check_minio_bucket(mc *minio.Client, bucket string) bool { +func checkMinioBucket(mc *minio.Client, bucket string) bool { tctx := context.Background() exists, err := mc.BucketExists(tctx, bucket) if err == nil && exists { diff --git a/pm-file-converter/components/miniocollector/miniocollector_test.go b/pm-file-converter/components/miniocollector/miniocollector_test.go index 17327a8..121799d 100644 --- a/pm-file-converter/components/miniocollector/miniocollector_test.go +++ b/pm-file-converter/components/miniocollector/miniocollector_test.go @@ -33,7 +33,7 @@ func TestMake_minio_bucket(t *testing.T) { // Create a test bucket. bucketName := "my-test-bucket" - err = create_minio_bucket(minioClient, bucketName) + err = createMinioBucket(minioClient, bucketName) if err != nil { log.Fatalf("Error creating bucket: %v", err) } else { @@ -61,7 +61,7 @@ func Test_bucket_cannot_empty(t *testing.T) { // Create a test bucket. bucketName := "" - err = create_minio_bucket(minioClient, bucketName) + err = createMinioBucket(minioClient, bucketName) if err != nil { assert.Error(t, err) } else { @@ -91,7 +91,7 @@ func Test_check_minio_bucket(t *testing.T) { // Create a test bucket. bucketName := "my-test-bucket" - found = check_minio_bucket(minioClient, bucketName) + found = checkMinioBucket(minioClient, bucketName) if found { assert.True(t, found) } else { @@ -121,7 +121,7 @@ func Test_bucket_not_exists(t *testing.T) { // Create a test bucket. bucketName := "my-test-bucket-not-exists" - found = check_minio_bucket(minioClient, bucketName) + found = checkMinioBucket(minioClient, bucketName) if found { assert.True(t, found) } else { @@ -168,7 +168,7 @@ func Test_upload_object(t *testing.T) { // Create a test bucket. bucketName := "my-test-bucket" - upload_object(minioClient, file_bytes, "minio_upload_test.json", bucketName) + uploadObject(minioClient, file_bytes, "minio_upload_test.json", bucketName) assert.NoError(t, err) diff --git a/pm-file-converter/components/xmltransform/xmltransform.go b/pm-file-converter/components/xmltransform/xmltransform.go index e7e04e4..6e70022 100644 --- a/pm-file-converter/components/xmltransform/xmltransform.go +++ b/pm-file-converter/components/xmltransform/xmltransform.go @@ -36,10 +36,10 @@ import ( ) //lint:ignore S117 -func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) { +func xmlToJsonConv(fBytevalue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) { var f dataTypes.MeasCollecFile start := time.Now() - err := xml.Unmarshal(*f_byteValue, &f) + err := xml.Unmarshal(*fBytevalue, &f) if err != nil { return nil, errors.New("Cannot unmarshal xml-file") } @@ -48,7 +48,6 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ( start = time.Now() var pmfile dataTypes.PMJsonFile - //TODO: Fill in more values pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0" pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = "" @@ -82,7 +81,6 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ( pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 - //TODO: Fill more values pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence @@ -109,8 +107,8 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ( } func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte { - evt_data := dataTypes.XmlFileEventHeader{} - jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evt_data) + evtData := dataTypes.XmlFileEventHeader{} + jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evtData) client := new(http.Client) @@ -136,8 +134,8 @@ func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte { log.Error("Error reading response, discarding message, ", err) return nil } - file_bytes := buf3.Bytes() + fileBytes := buf3.Bytes() fmt.Println("Converting to XML") - b, err := xml_to_json_conv(&file_bytes, &evt_data) + b, err := xmlToJsonConv(&fileBytes, &evtData) return b } diff --git a/pm-file-converter/components/xmltransform/xmltransform_test.go b/pm-file-converter/components/xmltransform/xmltransform_test.go index 9f12c99..5f6e89d 100644 --- a/pm-file-converter/components/xmltransform/xmltransform_test.go +++ b/pm-file-converter/components/xmltransform/xmltransform_test.go @@ -63,7 +63,7 @@ func TestXMLToJSONConv_Success(t *testing.T) { return } file_bytes := buf3.Bytes() - b, err := xml_to_json_conv(&file_bytes, &evt_data) + b, err := xmlToJsonConv(&file_bytes, &evt_data) json_filename := "A20230515.0700_0100-0715_0100_GNODEB-0.json" diff --git a/pm-file-converter/main.go b/pm-file-converter/main.go index b931a2a..599a33b 100644 --- a/pm-file-converter/main.go +++ b/pm-file-converter/main.go @@ -72,6 +72,8 @@ const ( Terminating ) +const registeringProducer = "Registering producer: " + // == Main ==// func main() { @@ -93,11 +95,11 @@ func main() { producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") } - go kafkacollector.Start_topic_writer(writer_control, data_out_channel) + go kafkacollector.StartTopicWriter(writer_control, data_out_channel) //Setup proc for periodic type registration - var event_chan = make(chan int) //Channel for stopping the proc - go periodic_registration(event_chan) + var eventChan = make(chan int) //Channel for stopping the proc + go periodicRegistration(eventChan) //Wait for term/int signal do try to shut down gracefully sigs := make(chan os.Signal, 1) @@ -105,7 +107,7 @@ func main() { go func() { sig := <-sigs fmt.Printf("Received signal %s - application will terminate\n", sig) - event_chan <- 0 // Stop periodic registration + eventChan <- 0 // Stop periodic registration datalock.Lock() defer datalock.Unlock() AppState = Terminating @@ -122,7 +124,7 @@ func main() { // == Core functions ==// // Run periodic registration of producers -func periodic_registration(evtch chan int) { +func periodicRegistration(evtch chan int) { var delay int = 1 for { select { @@ -131,7 +133,7 @@ func periodic_registration(evtch chan int) { return } case <-time.After(time.Duration(delay) * time.Second): - ok := register_producer() + ok := registerProducer() if ok { delay = registration_delay_long } else { @@ -145,24 +147,26 @@ func periodic_registration(evtch chan int) { } } -func register_producer() bool { +func registerProducer() bool { - log.Info("Registering producer: ", producer_instance_name) + log.Info(registeringProducer, producer_instance_name) file, err := os.ReadFile(config_file) if err != nil { log.Error("Cannot read config file: ", config_file) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, " - failed") return false } data := dataTypes.DataTypes{} err = jsoniter.Unmarshal([]byte(file), &data) if err != nil { log.Error("Cannot parse config file: ", config_file) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, " - failed") return false } - var new_type_names []string + var newTypeNames []string for i := 0; i < len(data.ProdDataTypes); i++ { t1 := make(map[string]interface{}) @@ -178,57 +182,59 @@ func register_producer() bool { json, err := jsoniter.Marshal(t1) if err != nil { log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, " - failed") return false } else { - ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") + ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") if !ok { log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, " - failed") return false } - new_type_names = append(new_type_names, data.ProdDataTypes[i].ID) + newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID) } } - log.Debug("Registering types: ", new_type_names) + log.Debug("Registering types: ", newTypeNames) datalock.Lock() defer datalock.Unlock() for _, v := range data.ProdDataTypes { log.Info("Adding type job for type: ", v.ID, " Type added to configuration") - start_type_job(v) + startTypeJob(v) } dataTypes.InfoTypes = data log.Debug("Datatypes: ", dataTypes.InfoTypes) - log.Info("Registering producer: ", producer_instance_name, " - OK") + log.Info(registeringProducer, producer_instance_name, " - OK") return true } -func start_type_job(dp dataTypes.DataType) { +func startTypeJob(dp dataTypes.DataType) { log.Info("Starting type job: ", dp.ID) - job_record := dataTypes.TypeJobRecord{} + jobRecord := dataTypes.TypeJobRecord{} - job_record.Job_control = make(chan dataTypes.JobControl, 1) - job_record.Reader_control = make(chan dataTypes.ReaderControl, 1) - job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length) - job_record.InfoType = dp.ID - job_record.InputTopic = dp.KafkaInputTopic - job_record.GroupId = "kafka-procon-" + dp.ID - job_record.ClientId = dp.ID + "-" + os.Getenv("KP") + jobRecord.Job_control = make(chan dataTypes.JobControl, 1) + jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1) + jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length) + jobRecord.InfoType = dp.ID + jobRecord.InputTopic = dp.KafkaInputTopic + jobRecord.GroupId = "kafka-procon-" + dp.ID + jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP") switch dp.ID { case "xml-file-data-to-filestore": - go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json") + go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json") case "xml-file-data": - go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "") + go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "") default: } - go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId) + go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId) - dataTypes.TypeJobs[dp.ID] = job_record + dataTypes.TypeJobs[dp.ID] = jobRecord log.Debug("Type job input type: ", dp.InputJobType) } -- 2.16.6