Issue-ID: NONRTRIC-879
Signed-off-by: ambrishest <ambrish.singh@est.tech>
Change-Id: I59b81fa264a55a7432f0b9669309169c9ca55e36
Signed-off-by: ambrishest <ambrish.singh@est.tech>
var httpclient = &http.Client{}
// Send a http request with json (json may be nil)
var httpclient = &http.Client{}
// Send a http request with json (json may be nil)
-//
-//lint:ignore S100
-func Send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
+func SendHttpRequest(json []byte, method string, url string, retry bool, useAuth bool) bool {
// set the HTTP method, url, and request body
var req *http.Request
// set the HTTP method, url, and request body
var req *http.Request
- token, err := kafkacollector.Fetch_token()
+ token, err := kafkacollector.FetchToken()
if err != nil {
log.Error("Cannot fetch token for http request: ", err)
return false
if err != nil {
log.Error("Cannot fetch token for http request: ", err)
return false
validate := func(t *testing.T, tc *testCase) {
t.Run(tc.Name, func(t *testing.T) {
validate := func(t *testing.T, tc *testCase) {
t.Run(tc.Name, func(t *testing.T) {
- actualBool := Send_http_request(tc.Json, tc.Method, tc.Url, tc.Retry, tc.UseAuth)
+ actualBool := SendHttpRequest(tc.Json, tc.Method, tc.Url, tc.Retry, tc.UseAuth)
assert.Equal(t, tc.ExpectedBool, actualBool)
})
assert.Equal(t, tc.ExpectedBool, actualBool)
})
const parallelism_limiter = 100 //For all jobs
var jobLimiterChan = make(chan struct{}, parallelism_limiter)
const parallelism_limiter = 100 //For all jobs
var jobLimiterChan = make(chan struct{}, parallelism_limiter)
-// noinspection GoCognitiveComplexity
-func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
+const typeLabel = " for type: "
+const fetchTokenErrorMessage = "Cannot fetch token: "
+const setTokenErrorMessage = "Cannot set token: "
- log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
+ log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
+
+ topicOk := false
var c *kafka.Consumer = nil
running := true
var c *kafka.Consumer = nil
running := true
- for topic_ok == false {
- case reader_ctrl := <-control_ch:
- if reader_ctrl.Command == "EXIT" {
- log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- data_ch <- nil //Signal to job handler
+ case readerCtrl := <-controlCh:
+ if readerCtrl.Command == "EXIT" {
+ log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+ dataCh <- nil //Signal to job handler
- c = create_kafka_consumer(type_id, gid, cid)
+ c = createKafkaConsumer(typeId, gid, cid)
- log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
+ log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
- log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
+ log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
- if c != nil && topic_ok == false {
+ if c != nil && topicOk == false {
err := c.SubscribeTopics([]string{topic}, nil)
if err != nil {
err := c.SubscribeTopics([]string{topic}, nil)
if err != nil {
- log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
+ log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying -- error details: ", err)
- log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
- topic_ok = true
+ log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
+ topicOk = true
- log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
+ log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
- var event_chan = make(chan int)
+ var eventChan = make(chan int)
go func() {
for {
select {
go func() {
for {
select {
switch evt.(type) {
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", evt)
switch evt.(type) {
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", evt)
- token, err := Fetch_token()
+ token, err := FetchToken()
- log.Warning("Cannot cannot fetch token: ", err)
+ log.Warning(fetchTokenErrorMessage, err)
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
+ log.Warning(setTokenErrorMessage, setTokenError)
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
- log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
+ log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
- case msg := <-event_chan:
+ case msg := <-eventChan:
- case reader_ctrl := <-control_ch:
- if reader_ctrl.Command == "EXIT" {
- event_chan <- 0
- log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- data_ch <- nil //Signal to job handler
+ case readerCtrl := <-controlCh:
+ if readerCtrl.Command == "EXIT" {
+ eventChan <- 0
+ log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+ dataCh <- nil //Signal to job handler
ev := c.Poll(1000)
if ev == nil {
ev := c.Poll(1000)
if ev == nil {
- log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
+ log.Debug("Topic Reader for type: ", typeId, " Nothing to consume on topic: ", topic)
continue
}
switch e := ev.(type) {
continue
}
switch e := ev.(type) {
log.Debug("Reader msg: ", &kmsg)
log.Debug("Reader msg: ", &kmsg)
- log.Debug("Reader - data_ch ", data_ch)
+ log.Debug("Reader - data_ch ", dataCh)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", ev)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", ev)
- token, err := Fetch_token()
+ token, err := FetchToken()
- log.Warning("Cannot cannot fetch token: ", err)
+ log.Warning(fetchTokenErrorMessage, err)
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
+ log.Warning(setTokenErrorMessage, setTokenError)
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
-func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
- var kafka_producer *kafka.Producer
+ var kafkaProducer *kafka.Producer
running := true
log.Info("Topic writer starting")
// Wait for kafka producer to become available - and be prepared to exit the writer
running := true
log.Info("Topic writer starting")
// Wait for kafka producer to become available - and be prepared to exit the writer
- for kafka_producer == nil {
+ for kafkaProducer == nil {
- case writer_ctl := <-control_ch:
- if writer_ctl.Command == "EXIT" {
+ case writerCtl := <-controlCh:
+ if writerCtl.Command == "EXIT" {
- kafka_producer = start_producer()
- if kafka_producer == nil {
+ kafkaProducer = startProducer()
+ if kafkaProducer == nil {
log.Debug("Could not start kafka producer - retrying")
time.Sleep(1 * time.Second)
} else {
log.Debug("Could not start kafka producer - retrying")
time.Sleep(1 * time.Second)
} else {
- var event_chan = make(chan int)
+ var eventChan = make(chan int)
go func() {
for {
select {
go func() {
for {
select {
- case evt := <-kafka_producer.Events():
+ case evt := <-kafkaProducer.Events():
switch evt.(type) {
case *kafka.Message:
m := evt.(*kafka.Message)
switch evt.(type) {
case *kafka.Message:
m := evt.(*kafka.Message)
log.Debug("Dumping topic writer event, error: ", evt)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New producer token needed: ", evt)
log.Debug("Dumping topic writer event, error: ", evt)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New producer token needed: ", evt)
- token, err := Fetch_token()
+ token, err := FetchToken()
- log.Warning("Cannot cannot fetch token: ", err)
- kafka_producer.SetOAuthBearerTokenFailure(err.Error())
+ log.Warning(fetchTokenErrorMessage, err)
+ kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
- setTokenError := kafka_producer.SetOAuthBearerToken(*token)
+ setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
if setTokenError != nil {
if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
- kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
+ log.Warning(setTokenErrorMessage, setTokenError)
+ kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
log.Debug("Dumping topic writer event, unknown: ", evt)
}
}
}
default:
log.Debug("Dumping topic writer event, unknown: ", evt)
}
- case msg := <-event_chan:
+ case msg := <-eventChan:
go func() {
for {
select {
go func() {
for {
select {
- case writer_ctl := <-control_ch:
- if writer_ctl.Command == "EXIT" {
+ case writerCtl := <-controlCh:
+ if writerCtl.Command == "EXIT" {
// ignore - wait for channel signal
}
// ignore - wait for channel signal
}
- case kmsg := <-data_ch:
log.Info("Topic writer stopped by channel signal - start_topic_writer")
log.Info("Topic writer stopped by channel signal - start_topic_writer")
- defer kafka_producer.Close()
+ defer kafkaProducer.Close()
- for retry := 1; retry <= retries && msg_ok == false; retry++ {
- err = kafka_producer.Produce(&kafka.Message{
+ for retry := 1; retry <= retries && msgOk == false; retry++ {
+ err = kafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
if err == nil {
TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
if err == nil {
log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
} else {
log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
time.Sleep(time.Duration(retry) * time.Second)
}
}
log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
} else {
log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
time.Sleep(time.Duration(retry) * time.Second)
}
}
log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
}
case <-time.After(1000 * time.Millisecond):
log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
}
case <-time.After(1000 * time.Millisecond):
-func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
+// This function intentionally has high cognitive complexity // NOSONAR
+func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
var cm kafka.ConfigMap
if creds_grant_type == "" {
- log.Info("Creating kafka plain text consumer for type: ", type_id)
+ log.Info("Creating kafka plain text consumer for type: ", typeId)
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
"enable.auto.commit": false,
}
} else {
"enable.auto.commit": false,
}
} else {
- log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
+ log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
c, err := kafka.NewConsumer(&cm)
if err != nil {
c, err := kafka.NewConsumer(&cm)
if err != nil {
- log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
+ log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
- log.Info("Created kafka consumer for type: ", type_id, " OK")
+ log.Info("Created kafka consumer for type: ", typeId, " OK")
return c
}
// Start kafka producer
return c
}
// Start kafka producer
-func start_producer() *kafka.Producer {
+// NOSONAR
+func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
var cm kafka.ConfigMap
log.Info("Creating kafka producer")
var cm kafka.ConfigMap
-func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
log.Info("Type job", type_id, " started")
log.Info("Type job", type_id, " started")
- topic_list := make(map[string]string)
- topic_list[type_id] = "json-file-ready-kp"
- topic_list["PmData"] = "json-file-ready-kpadp"
+ topicList := make(map[string]string)
+ topicList[type_id] = "json-file-ready-kp"
+ topicList["PmData"] = "json-file-ready-kpadp"
running := true
for {
select {
running := true
for {
select {
- case job_ctl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
- switch job_ctl.Command {
+ case jobCtl := <-control_ch:
+ log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
+ switch jobCtl.Command {
case "EXIT":
//ignore cmd - handled by channel signal
}
case "EXIT":
//ignore cmd - handled by channel signal
}
return
}
jobLimiterChan <- struct{}{}
return
}
jobLimiterChan <- struct{}{}
- go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
+ go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
case <-time.After(1 * time.Second):
if !running {
case <-time.After(1 * time.Second):
if !running {
-func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
+// This function intentionally has more parameters for legacy compatibility // NOSONAR
+func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
}()
start := time.Now()
defer func() {
<-jobLimiterChan
}()
start := time.Now()
- var evt_data dataTypes.XmlFileEventHeader
+ var evtData dataTypes.XmlFileEventHeader
- err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
+ err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
- log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
+ log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
- log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
+ log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
- new_fn := miniocollector.Xml_to_json_conv(&evt_data)
+ newFn := miniocollector.XmlToJsonConv(&evtData)
- log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
+ log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
- log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
+ log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
var fde dataTypes.FileDownloadedEvt
var fde dataTypes.FileDownloadedEvt
j, err := jsoniter.Marshal(fde)
if err != nil {
j, err := jsoniter.Marshal(fde)
if err != nil {
- msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
+ msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
log.Debug("Marshal file-collect event ", time.Since(start).String())
log.Debug("Marshal file-collect event ", time.Since(start).String())
- log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
- for _, v := range topic_list {
+ log.Debug("Sending file-collect event to output topic(s)", len(topicList))
+ for _, v := range topicList {
fmt.Println("Output Topic: " + v)
var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
kmsg.Msg = msg.Msg
kmsg.Topic = v
fmt.Println("Output Topic: " + v)
var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
kmsg.Msg = msg.Msg
kmsg.Topic = v
- data_out_channel <- kmsg
-func Fetch_token() (*kafka.OAuthBearerToken, error) {
+// NOSONAR
+func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
ClientID: creds_client_id,
log.Debug("Get token inline")
conf := &clientcredentials.Config{
ClientID: creds_client_id,
return nil, err
}
extensions := map[string]string{}
return nil, err
}
extensions := map[string]string{}
log.Debug("=====================================================")
log.Debug("token: ", token)
log.Debug("=====================================================")
log.Debug("=====================================================")
log.Debug("token: ", token)
log.Debug("=====================================================")
-func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
+func XmlToJsonConv(evtData *dataTypes.XmlFileEventHeader) string {
filestoreUser := os.Getenv("FILESTORE_USER")
filestorePwd := os.Getenv("FILESTORE_PWD")
filestoreServer := os.Getenv("FILESTORE_SERVER")
filestoreUser := os.Getenv("FILESTORE_USER")
filestorePwd := os.Getenv("FILESTORE_PWD")
filestoreServer := os.Getenv("FILESTORE_SERVER")
log.Fatalln(err)
}
expiry := time.Second * 24 * 60 * 60 // 1 day.
log.Fatalln(err)
}
expiry := time.Second * 24 * 60 * 60 // 1 day.
- objectName := evt_data.Name
- bucketName := evt_data.ObjectStoreBucket
- compresion := evt_data.Compression
+ objectName := evtData.Name
+ bucketName := evtData.ObjectStoreBucket
+ compresion := evtData.Compression
reqParams := make(url.Values)
reqParams := make(url.Values)
- xmlh, err := jsoniter.Marshal(evt_data)
+ xmlh, err := jsoniter.Marshal(evtData)
if err != nil {
fmt.Printf("Error: %s", err)
return ""
if err != nil {
fmt.Printf("Error: %s", err)
return ""
if err != nil {
log.Fatalln(err)
}
if err != nil {
log.Fatalln(err)
}
- file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
+ fileBytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
var buf bytes.Buffer
newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
var buf bytes.Buffer
- err = gzipWrite(&buf, &file_bytes)
- upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
+ err = gzipWrite(&buf, &fileBytes)
+ uploadObject(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
fmt.Println("")
return newObjectName
}
// nolint
fmt.Println("")
return newObjectName
}
// nolint
-func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
+func uploadObject(mc *minio.Client, b []byte, objectName string, fsbucket string) {
contentType := "application/json"
if strings.HasSuffix(objectName, ".gz") {
contentType = "application/gzip"
contentType := "application/json"
if strings.HasSuffix(objectName, ".gz") {
contentType = "application/gzip"
// Upload the xml file with PutObject
r := bytes.NewReader(b)
tctx := context.Background()
// Upload the xml file with PutObject
r := bytes.NewReader(b)
tctx := context.Background()
- if check_minio_bucket(mc, fsbucket) == false {
- err := create_minio_bucket(mc, fsbucket)
+ if checkMinioBucket(mc, fsbucket) == false {
+ err := createMinioBucket(mc, fsbucket)
if err != nil {
log.Error("Cannot create bucket: ", fsbucket, ", ", err)
return
if err != nil {
log.Error("Cannot create bucket: ", fsbucket, ", ", err)
return
-func create_minio_bucket(mc *minio.Client, bucket string) error {
+func createMinioBucket(mc *minio.Client, bucket string) error {
tctx := context.Background()
err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
if err != nil {
tctx := context.Background()
err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
if err != nil {
-func check_minio_bucket(mc *minio.Client, bucket string) bool {
+func checkMinioBucket(mc *minio.Client, bucket string) bool {
tctx := context.Background()
exists, err := mc.BucketExists(tctx, bucket)
if err == nil && exists {
tctx := context.Background()
exists, err := mc.BucketExists(tctx, bucket)
if err == nil && exists {
// Create a test bucket.
bucketName := "my-test-bucket"
// Create a test bucket.
bucketName := "my-test-bucket"
- err = create_minio_bucket(minioClient, bucketName)
+ err = createMinioBucket(minioClient, bucketName)
if err != nil {
log.Fatalf("Error creating bucket: %v", err)
} else {
if err != nil {
log.Fatalf("Error creating bucket: %v", err)
} else {
// Create a test bucket.
bucketName := ""
// Create a test bucket.
bucketName := ""
- err = create_minio_bucket(minioClient, bucketName)
+ err = createMinioBucket(minioClient, bucketName)
if err != nil {
assert.Error(t, err)
} else {
if err != nil {
assert.Error(t, err)
} else {
// Create a test bucket.
bucketName := "my-test-bucket"
// Create a test bucket.
bucketName := "my-test-bucket"
- found = check_minio_bucket(minioClient, bucketName)
+ found = checkMinioBucket(minioClient, bucketName)
if found {
assert.True(t, found)
} else {
if found {
assert.True(t, found)
} else {
// Create a test bucket.
bucketName := "my-test-bucket-not-exists"
// Create a test bucket.
bucketName := "my-test-bucket-not-exists"
- found = check_minio_bucket(minioClient, bucketName)
+ found = checkMinioBucket(minioClient, bucketName)
if found {
assert.True(t, found)
} else {
if found {
assert.True(t, found)
} else {
// Create a test bucket.
bucketName := "my-test-bucket"
// Create a test bucket.
bucketName := "my-test-bucket"
- upload_object(minioClient, file_bytes, "minio_upload_test.json", bucketName)
+ uploadObject(minioClient, file_bytes, "minio_upload_test.json", bucketName)
-func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) {
+func xmlToJsonConv(fBytevalue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) {
var f dataTypes.MeasCollecFile
start := time.Now()
var f dataTypes.MeasCollecFile
start := time.Now()
- err := xml.Unmarshal(*f_byteValue, &f)
+ err := xml.Unmarshal(*fBytevalue, &f)
if err != nil {
return nil, errors.New("Cannot unmarshal xml-file")
}
if err != nil {
return nil, errors.New("Cannot unmarshal xml-file")
}
start = time.Now()
var pmfile dataTypes.PMJsonFile
start = time.Now()
var pmfile dataTypes.PMJsonFile
- //TODO: Fill in more values
pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
- //TODO: Fill more values
pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
}
func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte {
}
func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte {
- evt_data := dataTypes.XmlFileEventHeader{}
- jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evt_data)
+ evtData := dataTypes.XmlFileEventHeader{}
+ jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evtData)
client := new(http.Client)
client := new(http.Client)
log.Error("Error reading response, discarding message, ", err)
return nil
}
log.Error("Error reading response, discarding message, ", err)
return nil
}
- file_bytes := buf3.Bytes()
+ fileBytes := buf3.Bytes()
fmt.Println("Converting to XML")
fmt.Println("Converting to XML")
- b, err := xml_to_json_conv(&file_bytes, &evt_data)
+ b, err := xmlToJsonConv(&fileBytes, &evtData)
return
}
file_bytes := buf3.Bytes()
return
}
file_bytes := buf3.Bytes()
- b, err := xml_to_json_conv(&file_bytes, &evt_data)
+ b, err := xmlToJsonConv(&file_bytes, &evt_data)
json_filename := "A20230515.0700_0100-0715_0100_GNODEB-0.json"
json_filename := "A20230515.0700_0100-0715_0100_GNODEB-0.json"
+const registeringProducer = "Registering producer: "
+
// == Main ==//
func main() {
// == Main ==//
func main() {
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
- go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
+ go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
//Setup proc for periodic type registration
//Setup proc for periodic type registration
- var event_chan = make(chan int) //Channel for stopping the proc
- go periodic_registration(event_chan)
+ var eventChan = make(chan int) //Channel for stopping the proc
+ go periodicRegistration(eventChan)
//Wait for term/int signal do try to shut down gracefully
sigs := make(chan os.Signal, 1)
//Wait for term/int signal do try to shut down gracefully
sigs := make(chan os.Signal, 1)
go func() {
sig := <-sigs
fmt.Printf("Received signal %s - application will terminate\n", sig)
go func() {
sig := <-sigs
fmt.Printf("Received signal %s - application will terminate\n", sig)
- event_chan <- 0 // Stop periodic registration
+ eventChan <- 0 // Stop periodic registration
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
// == Core functions ==//
// Run periodic registration of producers
// == Core functions ==//
// Run periodic registration of producers
-func periodic_registration(evtch chan int) {
+func periodicRegistration(evtch chan int) {
var delay int = 1
for {
select {
var delay int = 1
for {
select {
return
}
case <-time.After(time.Duration(delay) * time.Second):
return
}
case <-time.After(time.Duration(delay) * time.Second):
- ok := register_producer()
+ ok := registerProducer()
if ok {
delay = registration_delay_long
} else {
if ok {
delay = registration_delay_long
} else {
-func register_producer() bool {
+func registerProducer() bool {
- log.Info("Registering producer: ", producer_instance_name)
+ log.Info(registeringProducer, producer_instance_name)
file, err := os.ReadFile(config_file)
if err != nil {
log.Error("Cannot read config file: ", config_file)
file, err := os.ReadFile(config_file)
if err != nil {
log.Error("Cannot read config file: ", config_file)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
data := dataTypes.DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
return false
}
data := dataTypes.DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
- var new_type_names []string
+ var newTypeNames []string
for i := 0; i < len(data.ProdDataTypes); i++ {
t1 := make(map[string]interface{})
for i := 0; i < len(data.ProdDataTypes); i++ {
t1 := make(map[string]interface{})
json, err := jsoniter.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
json, err := jsoniter.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
- ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
+ ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
- new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
+ newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
- log.Debug("Registering types: ", new_type_names)
+ log.Debug("Registering types: ", newTypeNames)
datalock.Lock()
defer datalock.Unlock()
for _, v := range data.ProdDataTypes {
log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
datalock.Lock()
defer datalock.Unlock()
for _, v := range data.ProdDataTypes {
log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
}
dataTypes.InfoTypes = data
log.Debug("Datatypes: ", dataTypes.InfoTypes)
}
dataTypes.InfoTypes = data
log.Debug("Datatypes: ", dataTypes.InfoTypes)
- log.Info("Registering producer: ", producer_instance_name, " - OK")
+ log.Info(registeringProducer, producer_instance_name, " - OK")
-func start_type_job(dp dataTypes.DataType) {
+func startTypeJob(dp dataTypes.DataType) {
log.Info("Starting type job: ", dp.ID)
log.Info("Starting type job: ", dp.ID)
- job_record := dataTypes.TypeJobRecord{}
+ jobRecord := dataTypes.TypeJobRecord{}
- job_record.Job_control = make(chan dataTypes.JobControl, 1)
- job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
- job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
- job_record.InfoType = dp.ID
- job_record.InputTopic = dp.KafkaInputTopic
- job_record.GroupId = "kafka-procon-" + dp.ID
- job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
+ jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
+ jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
+ jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
+ jobRecord.InfoType = dp.ID
+ jobRecord.InputTopic = dp.KafkaInputTopic
+ jobRecord.GroupId = "kafka-procon-" + dp.ID
+ jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
switch dp.ID {
case "xml-file-data-to-filestore":
switch dp.ID {
case "xml-file-data-to-filestore":
- go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
+ go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
- go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
+ go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
- go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
+ go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
- dataTypes.TypeJobs[dp.ID] = job_record
+ dataTypes.TypeJobs[dp.ID] = jobRecord
log.Debug("Type job input type: ", dp.InputJobType)
}
log.Debug("Type job input type: ", dp.InputJobType)
}