const parallelism_limiter = 100 //For all jobs
var jobLimiterChan = make(chan struct{}, parallelism_limiter)
-// noinspection GoCognitiveComplexity
-func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
+const typeLabel = " for type: "
+const fetchTokenErrorMessage = "Cannot fetch token: "
+const setTokenErrorMessage = "Cannot set token: "
- log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
- topic_ok := false
+ log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
+
+ topicOk := false
var c *kafka.Consumer = nil
running := true
- for topic_ok == false {
+ for topicOk == false {
select {
- case reader_ctrl := <-control_ch:
- if reader_ctrl.Command == "EXIT" {
- log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- data_ch <- nil //Signal to job handler
+ case readerCtrl := <-controlCh:
+ if readerCtrl.Command == "EXIT" {
+ log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+ dataCh <- nil //Signal to job handler
running = false
return
}
return
}
if c == nil {
- c = create_kafka_consumer(type_id, gid, cid)
+ c = createKafkaConsumer(typeId, gid, cid)
if c == nil {
- log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
+ log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying")
} else {
- log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
+ log.Info("Consumer started on topic: ", topic, typeLabel, typeId)
}
}
- if c != nil && topic_ok == false {
+ if c != nil && topicOk == false {
err := c.SubscribeTopics([]string{topic}, nil)
if err != nil {
- log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
+ log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying -- error details: ", err)
} else {
- log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
- topic_ok = true
+ log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId)
+ topicOk = true
}
}
}
}
- log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
+ log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
- var event_chan = make(chan int)
+ var eventChan = make(chan int)
go func() {
for {
select {
switch evt.(type) {
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", evt)
- token, err := Fetch_token()
+ token, err := FetchToken()
if err != nil {
- log.Warning("Cannot cannot fetch token: ", err)
+ log.Warning(fetchTokenErrorMessage, err)
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
+ log.Warning(setTokenErrorMessage, setTokenError)
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
- log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
+ log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String())
}
- case msg := <-event_chan:
+ case msg := <-eventChan:
if msg == 0 {
return
}
for {
for {
select {
- case reader_ctrl := <-control_ch:
- if reader_ctrl.Command == "EXIT" {
- event_chan <- 0
- log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- data_ch <- nil //Signal to job handler
+ case readerCtrl := <-controlCh:
+ if readerCtrl.Command == "EXIT" {
+ eventChan <- 0
+ log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped")
+ dataCh <- nil //Signal to job handler
defer c.Close()
return
}
ev := c.Poll(1000)
if ev == nil {
- log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
+ log.Debug("Topic Reader for type: ", typeId, " Nothing to consume on topic: ", topic)
continue
}
switch e := ev.(type) {
c.Commit()
- data_ch <- &kmsg
+ dataCh <- &kmsg
log.Debug("Reader msg: ", &kmsg)
- log.Debug("Reader - data_ch ", data_ch)
+ log.Debug("Reader - data_ch ", dataCh)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", ev)
- token, err := Fetch_token()
+ token, err := FetchToken()
if err != nil {
- log.Warning("Cannot cannot fetch token: ", err)
+ log.Warning(fetchTokenErrorMessage, err)
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
+ log.Warning(setTokenErrorMessage, setTokenError)
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
}()
}
-func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
+// This function intentionally has high cognitive complexity // NOSONAR
+func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
- var kafka_producer *kafka.Producer
+ var kafkaProducer *kafka.Producer
running := true
log.Info("Topic writer starting")
// Wait for kafka producer to become available - and be prepared to exit the writer
- for kafka_producer == nil {
+ for kafkaProducer == nil {
select {
- case writer_ctl := <-control_ch:
- if writer_ctl.Command == "EXIT" {
+ case writerCtl := <-controlCh:
+ if writerCtl.Command == "EXIT" {
//ignore cmd
}
default:
- kafka_producer = start_producer()
- if kafka_producer == nil {
+ kafkaProducer = startProducer()
+ if kafkaProducer == nil {
log.Debug("Could not start kafka producer - retrying")
time.Sleep(1 * time.Second)
} else {
}
}
- var event_chan = make(chan int)
+ var eventChan = make(chan int)
go func() {
for {
select {
- case evt := <-kafka_producer.Events():
+ case evt := <-kafkaProducer.Events():
switch evt.(type) {
case *kafka.Message:
m := evt.(*kafka.Message)
log.Debug("Dumping topic writer event, error: ", evt)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New producer token needed: ", evt)
- token, err := Fetch_token()
+ token, err := FetchToken()
if err != nil {
- log.Warning("Cannot cannot fetch token: ", err)
- kafka_producer.SetOAuthBearerTokenFailure(err.Error())
+ log.Warning(fetchTokenErrorMessage, err)
+ kafkaProducer.SetOAuthBearerTokenFailure(err.Error())
} else {
- setTokenError := kafka_producer.SetOAuthBearerToken(*token)
+ setTokenError := kafkaProducer.SetOAuthBearerToken(*token)
if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
- kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
+ log.Warning(setTokenErrorMessage, setTokenError)
+ kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
log.Debug("Dumping topic writer event, unknown: ", evt)
}
- case msg := <-event_chan:
+ case msg := <-eventChan:
if msg == 0 {
return
}
go func() {
for {
select {
- case writer_ctl := <-control_ch:
- if writer_ctl.Command == "EXIT" {
+ case writerCtl := <-controlCh:
+ if writerCtl.Command == "EXIT" {
// ignore - wait for channel signal
}
- case kmsg := <-data_ch:
+ case kmsg := <-dataCh:
if kmsg == nil {
- event_chan <- 0
+ eventChan <- 0
log.Info("Topic writer stopped by channel signal - start_topic_writer")
- defer kafka_producer.Close()
+ defer kafkaProducer.Close()
return
}
retries := 10
- msg_ok := false
+ msgOk := false
var err error
- for retry := 1; retry <= retries && msg_ok == false; retry++ {
- err = kafka_producer.Produce(&kafka.Message{
+ for retry := 1; retry <= retries && msgOk == false; retry++ {
+ err = kafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
if err == nil {
- msg_ok = true
+ msgOk = true
log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
} else {
log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
time.Sleep(time.Duration(retry) * time.Second)
}
}
- if !msg_ok {
+ if !msgOk {
log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
}
case <-time.After(1000 * time.Millisecond):
}()
}
-func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
+// This function intentionally has high cognitive complexity // NOSONAR
+func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
- log.Info("Creating kafka plain text consumer for type: ", type_id)
+ log.Info("Creating kafka plain text consumer for type: ", typeId)
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
"enable.auto.commit": false,
}
} else {
- log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
+ log.Info("Creating kafka SASL plain text consumer for type: ", typeId)
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
c, err := kafka.NewConsumer(&cm)
if err != nil {
- log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
+ log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err)
return nil
}
- log.Info("Created kafka consumer for type: ", type_id, " OK")
+ log.Info("Created kafka consumer for type: ", typeId, " OK")
return c
}
// Start kafka producer
-func start_producer() *kafka.Producer {
+// NOSONAR
+func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
var cm kafka.ConfigMap
return p
}
-func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
log.Info("Type job", type_id, " started")
- topic_list := make(map[string]string)
- topic_list[type_id] = "json-file-ready-kp"
- topic_list["PmData"] = "json-file-ready-kpadp"
+ topicList := make(map[string]string)
+ topicList[type_id] = "json-file-ready-kp"
+ topicList["PmData"] = "json-file-ready-kpadp"
running := true
for {
select {
- case job_ctl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
- switch job_ctl.Command {
+ case jobCtl := <-control_ch:
+ log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
+ switch jobCtl.Command {
case "EXIT":
//ignore cmd - handled by channel signal
}
return
}
jobLimiterChan <- struct{}{}
- go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
+ go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
case <-time.After(1 * time.Second):
if !running {
}
}
-func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
+// This function intentionally has more parameters for legacy compatibility // NOSONAR
+func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
}()
start := time.Now()
- var evt_data dataTypes.XmlFileEventHeader
+ var evtData dataTypes.XmlFileEventHeader
- err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
+ err := jsoniter.Unmarshal(msg.Msg.Value, &evtData)
if err != nil {
- log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
+ log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err)
return
}
- log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
+ log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String())
start = time.Now()
- new_fn := miniocollector.Xml_to_json_conv(&evt_data)
+ newFn := miniocollector.XmlToJsonConv(&evtData)
if err != nil {
- log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
+ log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err)
return
}
- log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
+ log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String())
var fde dataTypes.FileDownloadedEvt
- fde.Filename = new_fn
+ fde.Filename = newFn
j, err := jsoniter.Marshal(fde)
if err != nil {
}
msg.Msg.Value = j
- msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
+ msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"")
log.Debug("Marshal file-collect event ", time.Since(start).String())
- log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
- for _, v := range topic_list {
+ log.Debug("Sending file-collect event to output topic(s)", len(topicList))
+ for _, v := range topicList {
fmt.Println("Output Topic: " + v)
var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
kmsg.Msg = msg.Msg
kmsg.Topic = v
- data_out_channel <- kmsg
+ dataOutChannel <- kmsg
}
}
-func Fetch_token() (*kafka.OAuthBearerToken, error) {
+// NOSONAR
+func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
ClientID: creds_client_id,
return nil, err
}
extensions := map[string]string{}
+
log.Debug("=====================================================")
log.Debug("token: ", token)
log.Debug("=====================================================")
Terminating
)
+const registeringProducer = "Registering producer: "
+
// == Main ==//
func main() {
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
- go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
+ go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
//Setup proc for periodic type registration
- var event_chan = make(chan int) //Channel for stopping the proc
- go periodic_registration(event_chan)
+ var eventChan = make(chan int) //Channel for stopping the proc
+ go periodicRegistration(eventChan)
//Wait for term/int signal do try to shut down gracefully
sigs := make(chan os.Signal, 1)
go func() {
sig := <-sigs
fmt.Printf("Received signal %s - application will terminate\n", sig)
- event_chan <- 0 // Stop periodic registration
+ eventChan <- 0 // Stop periodic registration
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
// == Core functions ==//
// Run periodic registration of producers
-func periodic_registration(evtch chan int) {
+func periodicRegistration(evtch chan int) {
var delay int = 1
for {
select {
return
}
case <-time.After(time.Duration(delay) * time.Second):
- ok := register_producer()
+ ok := registerProducer()
if ok {
delay = registration_delay_long
} else {
}
}
-func register_producer() bool {
+func registerProducer() bool {
- log.Info("Registering producer: ", producer_instance_name)
+ log.Info(registeringProducer, producer_instance_name)
file, err := os.ReadFile(config_file)
if err != nil {
log.Error("Cannot read config file: ", config_file)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
data := dataTypes.DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
- var new_type_names []string
+ var newTypeNames []string
for i := 0; i < len(data.ProdDataTypes); i++ {
t1 := make(map[string]interface{})
json, err := jsoniter.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
} else {
- ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
+ ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
+ // NOSONAR
+ log.Error(registeringProducer, producer_instance_name, " - failed")
return false
}
- new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
+ newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
}
}
- log.Debug("Registering types: ", new_type_names)
+ log.Debug("Registering types: ", newTypeNames)
datalock.Lock()
defer datalock.Unlock()
for _, v := range data.ProdDataTypes {
log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
- start_type_job(v)
+ startTypeJob(v)
}
dataTypes.InfoTypes = data
log.Debug("Datatypes: ", dataTypes.InfoTypes)
- log.Info("Registering producer: ", producer_instance_name, " - OK")
+ log.Info(registeringProducer, producer_instance_name, " - OK")
return true
}
-func start_type_job(dp dataTypes.DataType) {
+func startTypeJob(dp dataTypes.DataType) {
log.Info("Starting type job: ", dp.ID)
- job_record := dataTypes.TypeJobRecord{}
+ jobRecord := dataTypes.TypeJobRecord{}
- job_record.Job_control = make(chan dataTypes.JobControl, 1)
- job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
- job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
- job_record.InfoType = dp.ID
- job_record.InputTopic = dp.KafkaInputTopic
- job_record.GroupId = "kafka-procon-" + dp.ID
- job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
+ jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
+ jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
+ jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
+ jobRecord.InfoType = dp.ID
+ jobRecord.InputTopic = dp.KafkaInputTopic
+ jobRecord.GroupId = "kafka-procon-" + dp.ID
+ jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
switch dp.ID {
case "xml-file-data-to-filestore":
- go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
+ go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
case "xml-file-data":
- go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
+ go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
default:
}
- go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
+ go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
- dataTypes.TypeJobs[dp.ID] = job_record
+ dataTypes.TypeJobs[dp.ID] = jobRecord
log.Debug("Type job input type: ", dp.InputJobType)
}