X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pm-file-converter%2Fmain.go;h=00289512341a84bd0e4b16a1a26016b51bf084f1;hb=refs%2Fchanges%2F37%2F12337%2F1;hp=b931a2a7a3bf9f2113cb804ae12d18cd598797bd;hpb=5902c2dbd9980f771e56b2d582f57b163c9b742e;p=nonrtric%2Fplt%2Franpm.git diff --git a/pm-file-converter/main.go b/pm-file-converter/main.go index b931a2a..0028951 100644 --- a/pm-file-converter/main.go +++ b/pm-file-converter/main.go @@ -3,7 +3,8 @@ // ========================LICENSE_START================================= // O-RAN-SC // %% -// Copyright (C) 2023: Nordix Foundation +// Copyright (C) 2023: Nordix Foundation. All rights reserved. +// Copyright (C) 2023 OpenInfra Foundation Europe. All rights reserved. // %% // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,8 +22,6 @@ package main import ( "fmt" - jsoniter "github.com/json-iterator/go" - log "github.com/sirupsen/logrus" "main/common/dataTypes" "main/common/utils" "main/components/kafkacollector" @@ -33,6 +32,9 @@ import ( "sync" "syscall" "time" + + jsoniter "github.com/json-iterator/go" + log "github.com/sirupsen/logrus" ) var ics_server = os.Getenv("ICS") @@ -59,6 +61,8 @@ var writer_control = make(chan dataTypes.WriterControl, 1) const registration_delay_short = 2 const registration_delay_long = 120 +const failedMessageLabel = " - failed" + //== Variables ==// var AppState = Init @@ -72,6 +76,8 @@ const ( Terminating ) +const registeringProducer = "Registering producer: " + // == Main ==// func main() { @@ -93,11 +99,11 @@ func main() { producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") } - go kafkacollector.Start_topic_writer(writer_control, data_out_channel) + go kafkacollector.StartTopicWriter(writer_control, data_out_channel) //Setup proc for periodic type registration - var event_chan = make(chan int) //Channel for stopping the proc - go periodic_registration(event_chan) + var eventChan = make(chan int) //Channel for stopping the proc + go periodicRegistration(eventChan) //Wait for term/int signal do try to shut down gracefully sigs := make(chan os.Signal, 1) @@ -105,7 +111,7 @@ func main() { go func() { sig := <-sigs fmt.Printf("Received signal %s - application will terminate\n", sig) - event_chan <- 0 // Stop periodic registration + eventChan <- 0 // Stop periodic registration datalock.Lock() defer datalock.Unlock() AppState = Terminating @@ -122,7 +128,7 @@ func main() { // == Core functions ==// // Run periodic registration of producers -func periodic_registration(evtch chan int) { +func periodicRegistration(evtch chan int) { var delay int = 1 for { select { @@ -131,7 +137,7 @@ func periodic_registration(evtch chan int) { return } case <-time.After(time.Duration(delay) * time.Second): - ok := register_producer() + ok := registerProducer() if ok { delay = registration_delay_long } else { @@ -145,24 +151,26 @@ func periodic_registration(evtch chan int) { } } -func register_producer() bool { +func registerProducer() bool { - log.Info("Registering producer: ", producer_instance_name) + log.Info(registeringProducer, producer_instance_name) file, err := os.ReadFile(config_file) if err != nil { log.Error("Cannot read config file: ", config_file) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } data := dataTypes.DataTypes{} err = jsoniter.Unmarshal([]byte(file), &data) if err != nil { log.Error("Cannot parse config file: ", config_file) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } - var new_type_names []string + var newTypeNames []string for i := 0; i < len(data.ProdDataTypes); i++ { t1 := make(map[string]interface{}) @@ -178,57 +186,59 @@ func register_producer() bool { json, err := jsoniter.Marshal(t1) if err != nil { log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } else { - ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") + ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") if !ok { log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) - log.Error("Registering producer: ", producer_instance_name, " - failed") + // NOSONAR + log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } - new_type_names = append(new_type_names, data.ProdDataTypes[i].ID) + newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID) } } - log.Debug("Registering types: ", new_type_names) + log.Debug("Registering types: ", newTypeNames) datalock.Lock() defer datalock.Unlock() for _, v := range data.ProdDataTypes { log.Info("Adding type job for type: ", v.ID, " Type added to configuration") - start_type_job(v) + startTypeJob(v) } dataTypes.InfoTypes = data log.Debug("Datatypes: ", dataTypes.InfoTypes) - log.Info("Registering producer: ", producer_instance_name, " - OK") + log.Info(registeringProducer, producer_instance_name, " - OK") return true } -func start_type_job(dp dataTypes.DataType) { +func startTypeJob(dp dataTypes.DataType) { log.Info("Starting type job: ", dp.ID) - job_record := dataTypes.TypeJobRecord{} + jobRecord := dataTypes.TypeJobRecord{} - job_record.Job_control = make(chan dataTypes.JobControl, 1) - job_record.Reader_control = make(chan dataTypes.ReaderControl, 1) - job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length) - job_record.InfoType = dp.ID - job_record.InputTopic = dp.KafkaInputTopic - job_record.GroupId = "kafka-procon-" + dp.ID - job_record.ClientId = dp.ID + "-" + os.Getenv("KP") + jobRecord.Job_control = make(chan dataTypes.JobControl, 1) + jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1) + jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length) + jobRecord.InfoType = dp.ID + jobRecord.InputTopic = dp.KafkaInputTopic + jobRecord.GroupId = "kafka-procon-" + dp.ID + jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP") switch dp.ID { case "xml-file-data-to-filestore": - go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json") + go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json") case "xml-file-data": - go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "") + go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "") default: } - go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId) + go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId) - dataTypes.TypeJobs[dp.ID] = job_record + dataTypes.TypeJobs[dp.ID] = jobRecord log.Debug("Type job input type: ", dp.InputJobType) }