X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pm-file-converter%2Fcomponents%2Fkafkacollector%2Fkafkacollector.go;h=a7eaebfa5a7cf6f6cdb89c27fafacb292c686662;hb=refs%2Fchanges%2F57%2F11857%2F2;hp=7451dd2e605623fe3bdb2dc47a2c6c4209f55ebc;hpb=4dd29883b619182fb43ebc4565266831d2a3b79e;p=nonrtric%2Fplt%2Franpm.git diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go index 7451dd2..a7eaebf 100644 --- a/pm-file-converter/components/kafkacollector/kafkacollector.go +++ b/pm-file-converter/components/kafkacollector/kafkacollector.go @@ -43,22 +43,26 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL") const parallelism_limiter = 100 //For all jobs var jobLimiterChan = make(chan struct{}, parallelism_limiter) -// noinspection GoCognitiveComplexity -func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) { +const typeLabel = " for type: " +const fetchTokenErrorMessage = "Cannot fetch token: " +const setTokenErrorMessage = "Cannot set token: " - log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id) +// This function intentionally has high cognitive complexity // NOSONAR +func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) { - topic_ok := false + log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId) + + topicOk := false var c *kafka.Consumer = nil running := true - for topic_ok == false { + for topicOk == false { select { - case reader_ctrl := <-control_ch: - if reader_ctrl.Command == "EXIT" { - log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") - data_ch <- nil //Signal to job handler + case readerCtrl := <-controlCh: + if readerCtrl.Command == "EXIT" { + log.Info("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped") + dataCh <- nil //Signal to job handler running = false return } @@ -67,27 +71,27 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. return } if c == nil { - c = create_kafka_consumer(type_id, gid, cid) + c = createKafkaConsumer(typeId, gid, cid) if c == nil { - log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying") + log.Info("Cannot start consumer on topic: ", topic, typeLabel, typeId, " - retrying") } else { - log.Info("Consumer started on topic: ", topic, " for type: ", type_id) + log.Info("Consumer started on topic: ", topic, typeLabel, typeId) } } - if c != nil && topic_ok == false { + if c != nil && topicOk == false { err := c.SubscribeTopics([]string{topic}, nil) if err != nil { - log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err) + log.Info("Topic reader cannot start subscribing on topic: ", topic, typeLabel, typeId, " - retrying -- error details: ", err) } else { - log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id) - topic_ok = true + log.Info("Topic reader subscribing on topic: ", topic, typeLabel, typeId) + topicOk = true } } } } - log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id) + log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId) - var event_chan = make(chan int) + var eventChan = make(chan int) go func() { for { select { @@ -95,22 +99,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. switch evt.(type) { case kafka.OAuthBearerTokenRefresh: log.Debug("New consumer token needed: ", evt) - token, err := Fetch_token() + token, err := FetchToken() if err != nil { - log.Warning("Cannot cannot fetch token: ", err) + log.Warning(fetchTokenErrorMessage, err) c.SetOAuthBearerTokenFailure(err.Error()) } else { setTokenError := c.SetOAuthBearerToken(*token) if setTokenError != nil { - log.Warning("Cannot cannot set token: ", setTokenError) + log.Warning(setTokenErrorMessage, setTokenError) c.SetOAuthBearerTokenFailure(setTokenError.Error()) } } default: - log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String()) + log.Debug("Dumping topic reader event on topic: ", topic, typeLabel, typeId, " evt: ", evt.String()) } - case msg := <-event_chan: + case msg := <-eventChan: if msg == 0 { return } @@ -126,11 +130,11 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. for { for { select { - case reader_ctrl := <-control_ch: - if reader_ctrl.Command == "EXIT" { - event_chan <- 0 - log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") - data_ch <- nil //Signal to job handler + case readerCtrl := <-controlCh: + if readerCtrl.Command == "EXIT" { + eventChan <- 0 + log.Debug("Topic reader on topic: ", topic, typeLabel, typeId, " - stopped") + dataCh <- nil //Signal to job handler defer c.Close() return } @@ -138,7 +142,7 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. ev := c.Poll(1000) if ev == nil { - log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic) + log.Debug("Topic Reader for type: ", typeId, " Nothing to consume on topic: ", topic) continue } switch e := ev.(type) { @@ -148,22 +152,22 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. c.Commit() - data_ch <- &kmsg + dataCh <- &kmsg log.Debug("Reader msg: ", &kmsg) - log.Debug("Reader - data_ch ", data_ch) + log.Debug("Reader - data_ch ", dataCh) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) case kafka.OAuthBearerTokenRefresh: log.Debug("New consumer token needed: ", ev) - token, err := Fetch_token() + token, err := FetchToken() if err != nil { - log.Warning("Cannot cannot fetch token: ", err) + log.Warning(fetchTokenErrorMessage, err) c.SetOAuthBearerTokenFailure(err.Error()) } else { setTokenError := c.SetOAuthBearerToken(*token) if setTokenError != nil { - log.Warning("Cannot cannot set token: ", setTokenError) + log.Warning(setTokenErrorMessage, setTokenError) c.SetOAuthBearerTokenFailure(setTokenError.Error()) } } @@ -176,23 +180,24 @@ func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes. }() } -func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) { +// This function intentionally has high cognitive complexity // NOSONAR +func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) { - var kafka_producer *kafka.Producer + var kafkaProducer *kafka.Producer running := true log.Info("Topic writer starting") // Wait for kafka producer to become available - and be prepared to exit the writer - for kafka_producer == nil { + for kafkaProducer == nil { select { - case writer_ctl := <-control_ch: - if writer_ctl.Command == "EXIT" { + case writerCtl := <-controlCh: + if writerCtl.Command == "EXIT" { //ignore cmd } default: - kafka_producer = start_producer() - if kafka_producer == nil { + kafkaProducer = startProducer() + if kafkaProducer == nil { log.Debug("Could not start kafka producer - retrying") time.Sleep(1 * time.Second) } else { @@ -201,11 +206,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d } } - var event_chan = make(chan int) + var eventChan = make(chan int) go func() { for { select { - case evt := <-kafka_producer.Events(): + case evt := <-kafkaProducer.Events(): switch evt.(type) { case *kafka.Message: m := evt.(*kafka.Message) @@ -219,22 +224,22 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d log.Debug("Dumping topic writer event, error: ", evt) case kafka.OAuthBearerTokenRefresh: log.Debug("New producer token needed: ", evt) - token, err := Fetch_token() + token, err := FetchToken() if err != nil { - log.Warning("Cannot cannot fetch token: ", err) - kafka_producer.SetOAuthBearerTokenFailure(err.Error()) + log.Warning(fetchTokenErrorMessage, err) + kafkaProducer.SetOAuthBearerTokenFailure(err.Error()) } else { - setTokenError := kafka_producer.SetOAuthBearerToken(*token) + setTokenError := kafkaProducer.SetOAuthBearerToken(*token) if setTokenError != nil { - log.Warning("Cannot cannot set token: ", setTokenError) - kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error()) + log.Warning(setTokenErrorMessage, setTokenError) + kafkaProducer.SetOAuthBearerTokenFailure(setTokenError.Error()) } } default: log.Debug("Dumping topic writer event, unknown: ", evt) } - case msg := <-event_chan: + case msg := <-eventChan: if msg == 0 { return } @@ -248,36 +253,36 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d go func() { for { select { - case writer_ctl := <-control_ch: - if writer_ctl.Command == "EXIT" { + case writerCtl := <-controlCh: + if writerCtl.Command == "EXIT" { // ignore - wait for channel signal } - case kmsg := <-data_ch: + case kmsg := <-dataCh: if kmsg == nil { - event_chan <- 0 + eventChan <- 0 log.Info("Topic writer stopped by channel signal - start_topic_writer") - defer kafka_producer.Close() + defer kafkaProducer.Close() return } retries := 10 - msg_ok := false + msgOk := false var err error - for retry := 1; retry <= retries && msg_ok == false; retry++ { - err = kafka_producer.Produce(&kafka.Message{ + for retry := 1; retry <= retries && msgOk == false; retry++ { + err = kafkaProducer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny}, Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil) if err == nil { - msg_ok = true + msgOk = true log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic) } else { log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err) time.Sleep(time.Duration(retry) * time.Second) } } - if !msg_ok { + if !msgOk { log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err) } case <-time.After(1000 * time.Millisecond): @@ -289,10 +294,11 @@ func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *d }() } -func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer { +// This function intentionally has high cognitive complexity // NOSONAR +func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer { var cm kafka.ConfigMap if creds_grant_type == "" { - log.Info("Creating kafka plain text consumer for type: ", type_id) + log.Info("Creating kafka plain text consumer for type: ", typeId) cm = kafka.ConfigMap{ "bootstrap.servers": bootstrapserver, "group.id": gid, @@ -301,7 +307,7 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum "enable.auto.commit": false, } } else { - log.Info("Creating kafka SASL plain text consumer for type: ", type_id) + log.Info("Creating kafka SASL plain text consumer for type: ", typeId) cm = kafka.ConfigMap{ "bootstrap.servers": bootstrapserver, "group.id": gid, @@ -315,16 +321,17 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum c, err := kafka.NewConsumer(&cm) if err != nil { - log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err) + log.Error("Cannot create kafka consumer for type: ", typeId, ", error details: ", err) return nil } - log.Info("Created kafka consumer for type: ", type_id, " OK") + log.Info("Created kafka consumer for type: ", typeId, " OK") return c } // Start kafka producer -func start_producer() *kafka.Producer { +// NOSONAR +func startProducer() *kafka.Producer { log.Info("Creating kafka producer") var cm kafka.ConfigMap @@ -350,18 +357,18 @@ func start_producer() *kafka.Producer { return p } -func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) { +func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) { log.Info("Type job", type_id, " started") - topic_list := make(map[string]string) - topic_list[type_id] = "json-file-ready-kp" - topic_list["PmData"] = "json-file-ready-kpadp" + topicList := make(map[string]string) + topicList[type_id] = "json-file-ready-kp" + topicList["PmData"] = "json-file-ready-kpadp" running := true for { select { - case job_ctl := <-control_ch: - log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command) - switch job_ctl.Command { + case jobCtl := <-control_ch: + log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command) + switch jobCtl.Command { case "EXIT": //ignore cmd - handled by channel signal } @@ -374,7 +381,7 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro return } jobLimiterChan <- struct{}{} - go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket) + go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket) case <-time.After(1 * time.Second): if !running { @@ -384,30 +391,31 @@ func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobContro } } -func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) { +// This function intentionally has more parameters for legacy compatibility // NOSONAR +func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) { defer func() { <-jobLimiterChan }() start := time.Now() - var evt_data dataTypes.XmlFileEventHeader + var evtData dataTypes.XmlFileEventHeader - err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data) + err := jsoniter.Unmarshal(msg.Msg.Value, &evtData) if err != nil { - log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err) + log.Error("Cannot parse XmlFileEventHeader for type job: ", typeId, " - discarding message, error details", err) return } - log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String()) + log.Debug("Unmarshal file-collect event for type job: ", typeId, " time: ", time.Since(start).String()) start = time.Now() - new_fn := miniocollector.Xml_to_json_conv(&evt_data) + newFn := miniocollector.XmlToJsonConv(&evtData) if err != nil { - log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err) + log.Error("Cannot convert file ", evtData.Name, " - discarding message, ", err) return } - log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String()) + log.Debug("Converted file to json: ", newFn, " time", time.Since(start).String()) var fde dataTypes.FileDownloadedEvt - fde.Filename = new_fn + fde.Filename = newFn j, err := jsoniter.Marshal(fde) if err != nil { @@ -416,19 +424,20 @@ func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression } msg.Msg.Value = j - msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"") + msg.Msg.Key = []byte("\"" + evtData.SourceName + "\"") log.Debug("Marshal file-collect event ", time.Since(start).String()) - log.Debug("Sending file-collect event to output topic(s)", len(topic_list)) - for _, v := range topic_list { + log.Debug("Sending file-collect event to output topic(s)", len(topicList)) + for _, v := range topicList { fmt.Println("Output Topic: " + v) var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload) kmsg.Msg = msg.Msg kmsg.Topic = v - data_out_channel <- kmsg + dataOutChannel <- kmsg } } -func Fetch_token() (*kafka.OAuthBearerToken, error) { +// NOSONAR +func FetchToken() (*kafka.OAuthBearerToken, error) { log.Debug("Get token inline") conf := &clientcredentials.Config{ ClientID: creds_client_id, @@ -441,6 +450,7 @@ func Fetch_token() (*kafka.OAuthBearerToken, error) { return nil, err } extensions := map[string]string{} + log.Debug("=====================================================") log.Debug("token: ", token) log.Debug("=====================================================")