From: aravind.est Date: Wed, 9 Oct 2024 12:43:29 +0000 (+0100) Subject: Fix increased cpu usage in pm-file-converter X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=fde4b02c6bdb1195d179af53f539982408a04997;p=nonrtric%2Fplt%2Franpm.git Fix increased cpu usage in pm-file-converter Go subroutines removed when new subroutes gets created Issue-ID: NONRTRIC-1036 Change-Id: I42e61ed7557cb31c27d10693aa3b760716343f8a Signed-off-by: aravind.est --- diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go index a3c911e..423c591 100644 --- a/pm-file-converter/components/kafkacollector/kafkacollector.go +++ b/pm-file-converter/components/kafkacollector/kafkacollector.go @@ -44,6 +44,7 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL") // Limiter - valid for all jobs const parallelism_limiter = 100 //For all jobs var jobLimiterChan = make(chan struct{}, parallelism_limiter) +var spawnNewTopicReaders = false const typeLabel = " for type: " const fetchTokenErrorMessage = "Cannot fetch token: " @@ -55,11 +56,12 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId) topicOk := false - var c *kafka.Consumer = nil + running := true + spawnNewTopicReaders = true + var c *kafka.Consumer = nil for topicOk == false { - select { case readerCtrl := <-controlCh: if readerCtrl.Command == "EXIT" { @@ -93,9 +95,15 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read } log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId) + spawnNewTopicReaders = false var eventChan = make(chan int) go func() { for { + //Kill the subroutine when new subroutine gets created + if spawnNewTopicReaders { + return + } + select { case evt := <-c.Events(): switch evt.(type) { @@ -131,6 +139,12 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read go func() { for { for { + //Kill the subroutine when new subroutine gets created + if spawnNewTopicReaders { + //Closed the kafka consumer associated with the routine + c.Close() + return + } select { case readerCtrl := <-controlCh: if readerCtrl.Command == "EXIT" {