Issue-ID: NONRTRIC-879
Change-Id: Ie98efb460527fd29d9051c259e3f36b9b43f5bc3
Signed-off-by: ambrishest <ambrish.singh@est.tech>
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================LICENSE_END===================================
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================LICENSE_END===================================
package kafkacollector
import (
package kafkacollector
import (
const fetchTokenErrorMessage = "Cannot fetch token: "
const setTokenErrorMessage = "Cannot set token: "
const fetchTokenErrorMessage = "Cannot fetch token: "
const setTokenErrorMessage = "Cannot set token: "
-// This function intentionally has high cognitive complexity // NOSONAR
+// This function intentionally has high cognitive complexity //NOSONAR
func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
func StartTopicReader(topic string, typeId string, controlCh chan dataTypes.ReaderControl, dataCh chan *dataTypes.KafkaPayload, gid string, cid string) {
log.Info("Topic reader starting, topic: ", topic, typeLabel, typeId)
-// This function intentionally has high cognitive complexity // NOSONAR
+// This function intentionally has high cognitive complexity //NOSONAR
func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
var kafkaProducer *kafka.Producer
func StartTopicWriter(controlCh chan dataTypes.WriterControl, dataCh chan *dataTypes.KafkaPayload) {
var kafkaProducer *kafka.Producer
-// This function intentionally has high cognitive complexity // NOSONAR
+// We need to pass the kafka properties in this way for readability purpose //NOSONAR
func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
func createKafkaConsumer(typeId string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
-// Start kafka producer
-// NOSONAR
+// We need to pass the kafka properties in this way for readability purpose //NOSONAR
func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
func startProducer() *kafka.Producer {
log.Info("Creating kafka producer")
-func StartJobXmlFileData(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+func StartJobXmlFileData(typeId string, controlCh chan dataTypes.JobControl, dataInCh chan *dataTypes.KafkaPayload, dataOutChannel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
- log.Info("Type job", type_id, " started")
+ log.Info("Type job", typeId, " started")
topicList := make(map[string]string)
topicList := make(map[string]string)
- topicList[type_id] = "json-file-ready-kp"
+ topicList[typeId] = "json-file-ready-kp"
topicList["PmData"] = "json-file-ready-kpadp"
running := true
for {
select {
topicList["PmData"] = "json-file-ready-kpadp"
running := true
for {
select {
- case jobCtl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", jobCtl.Command)
+ case jobCtl := <-controlCh:
+ log.Debug("Type job ", typeId, " new cmd received ", jobCtl.Command)
switch jobCtl.Command {
case "EXIT":
//ignore cmd - handled by channel signal
}
switch jobCtl.Command {
case "EXIT":
//ignore cmd - handled by channel signal
}
- case msg := <-data_in_ch:
+ case msg := <-dataInCh:
- log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
+ log.Info("Type job ", typeId, " stopped by channel signal - start_job_xml_file_data")
running = false
return
}
jobLimiterChan <- struct{}{}
running = false
return
}
jobLimiterChan <- struct{}{}
- go runXmlJob(type_id, msg, "gz", data_out_channel, topicList, jobLimiterChan, fvolume, fsbucket)
+ go runXmlJob(typeId, msg, "gz", dataOutChannel, topicList, jobLimiterChan, fvolume, fsbucket)
case <-time.After(1 * time.Second):
if !running {
case <-time.After(1 * time.Second):
if !running {
-// This function intentionally has more parameters for legacy compatibility // NOSONAR
+// This function intentionally has more parameters for legacy compatibility //NOSONAR
func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
func runXmlJob(typeId string, msg *dataTypes.KafkaPayload, outputCompression string, dataOutChannel chan *dataTypes.KafkaPayload, topicList map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
defer func() {
<-jobLimiterChan
+// This function intentionally has some patterns in logs for easier identification //NOSONAR
func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
func FetchToken() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
const registration_delay_short = 2
const registration_delay_long = 120
const registration_delay_short = 2
const registration_delay_long = 120
+const failedMessageLabel = " - failed"
+
//== Variables ==//
var AppState = Init
//== Variables ==//
var AppState = Init
if err != nil {
log.Error("Cannot read config file: ", config_file)
// NOSONAR
if err != nil {
log.Error("Cannot read config file: ", config_file)
// NOSONAR
- log.Error(registeringProducer, producer_instance_name, " - failed")
+ log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
}
data := dataTypes.DataTypes{}
return false
}
data := dataTypes.DataTypes{}
if err != nil {
log.Error("Cannot parse config file: ", config_file)
// NOSONAR
if err != nil {
log.Error("Cannot parse config file: ", config_file)
// NOSONAR
- log.Error(registeringProducer, producer_instance_name, " - failed")
+ log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
}
var newTypeNames []string
return false
}
var newTypeNames []string
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
// NOSONAR
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
// NOSONAR
- log.Error(registeringProducer, producer_instance_name, " - failed")
+ log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
} else {
ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
// NOSONAR
return false
} else {
ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
// NOSONAR
- log.Error(registeringProducer, producer_instance_name, " - failed")
+ log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
}
newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
return false
}
newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)