X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pm-file-converter%2Fcomponents%2Fkafkacollector%2Fkafkacollector.go;h=a3c911ef1a4eb1752dc1be4b2f0ec500218dee65;hb=HEAD;hp=a7eaebfa5a7cf6f6cdb89c27fafacb292c686662;hpb=6e3dc81c76fc88e000b83c35282d3db823f815a3;p=nonrtric%2Fplt%2Franpm.git diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go index a7eaebf..a3c911e 100644 --- a/pm-file-converter/components/kafkacollector/kafkacollector.go +++ b/pm-file-converter/components/kafkacollector/kafkacollector.go @@ -17,6 +17,8 @@ // See the License for the specific language governing permissions and // limitations under the License. // ========================LICENSE_END=================================== + +//nolint:all package kafkacollector import ( @@ -47,7 +49,7 @@ const typeLabel = " for type: " const fetchTokenErrorMessage = "Cannot fetch token: " const setTokenErrorMessage = "Cannot set token: " -// This function intentionally has high cognitive complexity // NOSONAR +//nolint:all func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) { log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId) @@ -180,7 +182,7 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read }() } -// This function intentionally has high cognitive complexity // NOSONAR +//nolint:all func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) { var kafkaProducer *kafka.Producer @@ -294,7 +296,7 @@ func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataT }() } -// This function intentionally has high cognitive complexity // NOSONAR +//nolint:all func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer { var cm kafka.ConfigMap if creds_grant_type == "" { @@ -329,8 +331,7 @@ func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer return c } -// Start kafka producer -// NOSONAR +//nolint:all func startProducer() *kafka.Producer { log.Info("Creating kafka producer") @@ -357,31 +358,32 @@ func startProducer() *kafka.Producer { return p } -func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) { +//nolint:all +func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) { - log.Info("Type job", type_id, " started") + log.Info("Type job", typeId, " started") topicList := make(map[string]string) - topicList[type_id] = "json-file-ready-kp" + topicList[typeId] = "json-file-ready-kp" topicList["PmData"] = "json-file-ready-kpadp" running := true for { select { - case jobCtl := <-control_ch: - log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command) + case jobCtl := <-controlCh: + log.Debug("Type job ", typeId, " new cmd received ", jobCtl.Command) switch jobCtl.Command { case "EXIT": //ignore cmd - handled by channel signal } - case msg := <-data_in_ch: + case msg := <-dataInCh: if msg == nil { - log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") + log.Info("Type job ", typeId, " stopped by channel signal - start_job_xml_file_data") running = false return } jobLimiterChan <- struct{}{} - go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket) + go runXmlJob(typeId, msg, "gz", dataOutChannel, topicList, jobLimiterChan, fvolume, fsbucket) case <-time.After(1 * time.Second): if !running { @@ -391,7 +393,7 @@ func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, d } } -// This function intentionally has more parameters for legacy compatibility // NOSONAR +//nolint:all func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) { defer func() { <-jobLimiterChan @@ -436,7 +438,7 @@ func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression str } } -// NOSONAR +//nolint:all func FetchToken() (*kafka.OAuthBearerToken, error) { log.Debug("Get token inline") conf := &clientcredentials.Config{