// See the License for the specific language governing permissions and
// limitations under the License.
// ========================LICENSE_END===================================
+
+//nolint:all
package kafkacollector
import (
const fetchTokenErrorMessage = "Cannot fetch token: "
const setTokenErrorMessage = "Cannot set token: "
-// This function intentionally has high cognitive complexity // NOSONAR
+//nolint:all
func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
}()
}
-// This function intentionally has high cognitive complexity // NOSONAR
+//nolint:all
func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
var kafkaProducer *kafka.Producer
}()
}
-// This function intentionally has high cognitive complexity // NOSONAR
+//nolint:all
func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
return c
}
-// Start kafka producer
-// NOSONAR
+//nolint:all
func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
return p
}
-func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+//nolint:all
+func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
- log.Info("Type job", type_id, " started")
+ log.Info("Type job", typeId, " started")
topicList := make(map[string]string)
- topicList[type_id] = "json-file-ready-kp"
+ topicList[typeId] = "json-file-ready-kp"
topicList["PmData"] = "json-file-ready-kpadp"
running := true
for {
select {
- case jobCtl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
+ case jobCtl := <-controlCh:
+ log.Debug("Type job ", typeId, " new cmd received ", jobCtl.Command)
switch jobCtl.Command {
case "EXIT":
//ignore cmd - handled by channel signal
}
- case msg := <-data_in_ch:
+ case msg := <-dataInCh:
if msg == nil {
- log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
+ log.Info("Type job ", typeId, " stopped by channel signal - start_job_xml_file_data")
running = false
return
}
jobLimiterChan <- struct{}{}
- go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
+ go runXmlJob(typeId, msg, "gz", dataOutChannel, topicList, jobLimiterChan, fvolume, fsbucket)
case <-time.After(1 * time.Second):
if !running {
}
}
-// This function intentionally has more parameters for legacy compatibility // NOSONAR
+//nolint:all
func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
}
}
-// NOSONAR
+//nolint:all
func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{