Fix increased cpu usage in pm-file-converter 68/13568/1
authoraravind.est <aravindhan.a@est.tech>
Wed, 9 Oct 2024 12:43:29 +0000 (13:43 +0100)
committeraravind.est <aravindhan.a@est.tech>
Wed, 9 Oct 2024 12:43:29 +0000 (13:43 +0100)
Go subroutines removed when new subroutes gets created

Issue-ID: NONRTRIC-1036
Change-Id: I42e61ed7557cb31c27d10693aa3b760716343f8a
Signed-off-by: aravind.est <aravindhan.a@est.tech>
pm-file-converter/components/kafkacollector/kafkacollector.go

index a3c911e..423c591 100644 (file)
@@ -44,6 +44,7 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
 // Limiter - valid for all jobs
 const parallelism_limiter = 100 //For all jobs
 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
+var spawnNewTopicReaders = false
 
 const typeLabel = " for type: "
 const fetchTokenErrorMessage = "Cannot fetch token: "
@@ -55,11 +56,12 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read
        log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
 
        topicOk := false
-       var c *kafka.Consumer = nil
+
        running := true
+       spawnNewTopicReaders = true
+       var c *kafka.Consumer = nil
 
        for topicOk == false {
-
                select {
                case readerCtrl := <-controlCh:
                        if readerCtrl.Command == "EXIT" {
@@ -93,9 +95,15 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read
        }
        log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
 
+       spawnNewTopicReaders = false
        var eventChan = make(chan int)
        go func() {
                for {
+                       //Kill the subroutine when new subroutine gets created
+                       if spawnNewTopicReaders {
+                               return
+                       }
+
                        select {
                        case evt := <-c.Events():
                                switch evt.(type) {
@@ -131,6 +139,12 @@ func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.Read
        go func() {
                for {
                        for {
+                               //Kill the subroutine when new subroutine gets created
+                               if spawnNewTopicReaders {
+                                       //Closed the kafka consumer associated with the routine
+                                       c.Close()
+                                       return
+                               }
                                select {
                                case readerCtrl := <-controlCh:
                                        if readerCtrl.Command == "EXIT" {