// Limiter - valid for all jobs
const parallelism_limiter = 100 //For all jobs
var jobLimiterChan = make(chan struct{}, parallelism_limiter)
+var spawnNewTopicReaders = false
const typeLabel = " for type: "
const fetchTokenErrorMessage = "Cannot fetch token: "
log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
topicOk := false
- var c *kafka.Consumer = nil
+
running := true
+ spawnNewTopicReaders = true
+ var c *kafka.Consumer = nil
for topicOk == false {
-
select {
case readerCtrl := <-controlCh:
if readerCtrl.Command == "EXIT" {
}
log.Info("Topic reader ready on topic: ", topic, typeLabel, typeId)
+ spawnNewTopicReaders = false
var eventChan = make(chan int)
go func() {
for {
+ //Kill the subroutine when new subroutine gets created
+ if spawnNewTopicReaders {
+ return
+ }
+
select {
case evt := <-c.Events():
switch evt.(type) {
go func() {
for {
for {
+ //Kill the subroutine when new subroutine gets created
+ if spawnNewTopicReaders {
+ //Closed the kafka consumer associated with the routine
+ c.Close()
+ return
+ }
select {
case readerCtrl := <-controlCh:
if readerCtrl.Command == "EXIT" {