import (
"context"
"fmt"
+ "main/common/dataTypes"
+ "main/components/miniocollector"
+ "os"
+ "time"
+
"github.com/confluentinc/confluent-kafka-go/kafka"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
"golang.org/x/oauth2/clientcredentials"
- "main/common/dataTypes"
- "main/components/miniocollector"
- "os"
- "time"
)
var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
const parallelism_limiter = 100 //For all jobs
var jobLimiterChan = make(chan struct{}, parallelism_limiter)
+// noinspection GoCognitiveComplexity
func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)