Terminating
)
+const registeringProducer = "Registering producer: "
+
// == Main ==//
func main() {
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
- go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
+ go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
//Setup proc for periodic type registration
- var event_chan = make(chan int) //Channel for stopping the proc
- go periodic_registration(event_chan)
+ var eventChan = make(chan int) //Channel for stopping the proc
+ go periodicRegistration(eventChan)
//Wait for term/int signal do try to shut down gracefully
sigs := make(chan os.Signal, 1)
go func() {
sig := <-sigs
fmt.Printf("Received signal %s - application will terminate\n", sig)
- event_chan <- 0 // Stop periodic registration
+ eventChan <- 0 // Stop periodic registration
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
// == Core functions ==//
// Run periodic registration of producers
-func periodic_registration(evtch chan int) {
+func periodicRegistration(evtch chan int) {
var delay int = 1
for {
select {
return
}
case <-time.After(time.Duration(delay) * time.Second):
- ok := register_producer()
+ ok := registerProducer()
if ok {
delay = registration_delay_long
} else {
}
}
-func register_producer() bool {
+func registerProducer() bool {
- log.Info("Registering producer: ", producer_instance_name)
+ log.Info(registeringProducer, producer_instance_name)
file, err := os.ReadFile(config_file)
if err != nil {
log.Error("Cannot read config file: ", config_file)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
data := dataTypes.DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
- var new_type_names []string
+ var newTypeNames []string
for i := 0; i < len(data.ProdDataTypes); i++ {
t1 := make(map[string]interface{})
json, err := jsoniter.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
} else {
- ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
+ ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
- new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
+ newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
}
}
- log.Debug("Registering types: ", new_type_names)
+ log.Debug("Registering types: ", newTypeNames)
datalock.Lock()
defer datalock.Unlock()
for _, v := range data.ProdDataTypes {
log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
- start_type_job(v)
+ startTypeJob(v)
}
dataTypes.InfoTypes = data
log.Debug("Datatypes: ", dataTypes.InfoTypes)
- log.Info("Registering producer: ", producer_instance_name, " - OK")
+ log.Info(registeringProducer, producer_instance_name, " - OK")
return true
}
-func start_type_job(dp dataTypes.DataType) {
+func startTypeJob(dp dataTypes.DataType) {
log.Info("Starting type job: ", dp.ID)
- job_record := dataTypes.TypeJobRecord{}
+ jobRecord := dataTypes.TypeJobRecord{}
- job_record.Job_control = make(chan dataTypes.JobControl, 1)
- job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
- job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
- job_record.InfoType = dp.ID
- job_record.InputTopic = dp.KafkaInputTopic
- job_record.GroupId = "kafka-procon-" + dp.ID
- job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
+ jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
+ jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
+ jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
+ jobRecord.InfoType = dp.ID
+ jobRecord.InputTopic = dp.KafkaInputTopic
+ jobRecord.GroupId = "kafka-procon-" + dp.ID
+ jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
switch dp.ID {
case "xml-file-data-to-filestore":
- go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
+ go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
case "xml-file-data":
- go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
+ go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
default:
}
- go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
+ go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
- dataTypes.TypeJobs[dp.ID] = job_record
+ dataTypes.TypeJobs[dp.ID] = jobRecord
log.Debug("Type job input type: ", dp.InputJobType)
}