X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pm-file-converter%2Fcomponents%2Fkafkacollector%2Fkafkacollector.go;h=7451dd2e605623fe3bdb2dc47a2c6c4209f55ebc;hb=refs%2Fchanges%2F02%2F11802%2F2;hp=d70114c55ccc61d89dcb8a40c7d129e0b0527f16;hpb=696b3a4eb9eecf92b99e62e0081f7140c0294155;p=nonrtric%2Fplt%2Franpm.git diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go index d70114c..7451dd2 100644 --- a/pm-file-converter/components/kafkacollector/kafkacollector.go +++ b/pm-file-converter/components/kafkacollector/kafkacollector.go @@ -22,14 +22,15 @@ package kafkacollector import ( "context" "fmt" + "main/common/dataTypes" + "main/components/miniocollector" + "os" + "time" + "github.com/confluentinc/confluent-kafka-go/kafka" jsoniter "github.com/json-iterator/go" log "github.com/sirupsen/logrus" "golang.org/x/oauth2/clientcredentials" - "main/common/dataTypes" - "main/components/miniocollector" - "os" - "time" ) var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE") @@ -42,6 +43,7 @@ var creds_service_url = os.Getenv("AUTH_SERVICE_URL") const parallelism_limiter = 100 //For all jobs var jobLimiterChan = make(chan struct{}, parallelism_limiter) +// noinspection GoCognitiveComplexity func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) { log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)