summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
ba3389b)
Issue-ID: NONRTRIC-879
Change-Id: I5720bdfaeb3760d7ce0b8bfdaff43ab6896cf052
Signed-off-by: ambrishest <ambrish.singh@est.tech>
// limitations under the License.
// ========================LICENSE_END===================================
// limitations under the License.
// ========================LICENSE_END===================================
package kafkacollector
import (
package kafkacollector
import (
const fetchTokenErrorMessage = "Cannot fetch token: "
const setTokenErrorMessage = "Cannot set token: "
const fetchTokenErrorMessage = "Cannot fetch token: "
const setTokenErrorMessage = "Cannot set token: "
-// This function intentionally has high cognitive complexity //NOSONAR
func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
-// This function intentionally has high cognitive complexity //NOSONAR
func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
var kafkaProducer *kafka.Producer
func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
var kafkaProducer *kafka.Producer
-// We need to pass the kafka properties in this way for readability purpose //NOSONAR
func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
-// We need to pass the kafka properties in this way for readability purpose //NOSONAR
func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
log.Info("Type job", typeId, " started")
func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
log.Info("Type job", typeId, " started")
-// This function intentionally has more parameters for legacy compatibility //NOSONAR
func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
-// This function intentionally has some patterns in logs for easier identification //NOSONAR
func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{