// - // // ========================LICENSE_START================================= // O-RAN-SC // %% // Copyright (C) 2023: Nordix Foundation. All rights reserved. // Copyright (C) 2023 OpenInfra Foundation Europe. All rights reserved. // %% // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ========================LICENSE_END=================================== package main import ( "fmt" "main/common/dataTypes" "main/common/utils" "main/components/kafkacollector" "net/http" "os" "os/signal" "runtime" "sync" "syscall" "time" jsoniter "github.com/json-iterator/go" log "github.com/sirupsen/logrus" ) var ics_server = os.Getenv("ICS") var self = os.Getenv("SELF") // This are optional - set if using SASL protocol is used towards kafka var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE") var bootstrapserver = os.Getenv("KAFKA_SERVER") const config_file = "application_configuration.json" const producer_name = "kafka-producer" var producer_instance_name string = producer_name const reader_queue_length = 100 //Per type job const writer_queue_length = 100 //Per info job var files_volume = os.Getenv("FILES_VOLUME") var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length) var writer_control = make(chan dataTypes.WriterControl, 1) const registration_delay_short = 2 const registration_delay_long = 120 const failedMessageLabel = " - failed" //== Variables ==// var AppState = Init // Lock for all internal data var datalock sync.Mutex const ( Init dataTypes.AppStates = iota Running Terminating ) const registeringProducer = "Registering producer: " // == Main ==// func main() { //log.SetLevel(log.InfoLevel) log.SetLevel(log.TraceLevel) log.Info("Server starting...") if self == "" { log.Panic("Env SELF not configured") } if bootstrapserver == "" { log.Panic("Env KAFKA_SERVER not set") } if ics_server == "" { log.Panic("Env ICS not set") } if os.Getenv("KP") != "" { producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") } go kafkacollector.StartTopicWriter(writer_control, data_out_channel) //Setup proc for periodic type registration var eventChan = make(chan int) //Channel for stopping the proc go periodicRegistration(eventChan) //Wait for term/int signal do try to shut down gracefully sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs fmt.Printf("Received signal %s - application will terminate\n", sig) eventChan <- 0 // Stop periodic registration datalock.Lock() defer datalock.Unlock() AppState = Terminating }() AppState = Running //Wait until all go routines has exited runtime.Goexit() fmt.Println("main routine exit") fmt.Println("server stopped") } // == Core functions ==// // Run periodic registration of producers func periodicRegistration(evtch chan int) { var delay int = 1 for { select { case msg := <-evtch: if msg == 0 { // Stop thread return } case <-time.After(time.Duration(delay) * time.Second): ok := registerProducer() if ok { delay = registration_delay_long } else { if delay < registration_delay_long { delay += registration_delay_short } else { delay = registration_delay_short } } } } } func registerProducer() bool { log.Info(registeringProducer, producer_instance_name) file, err := os.ReadFile(config_file) if err != nil { log.Error("Cannot read config file: ", config_file) // NOSONAR log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } data := dataTypes.DataTypes{} err = jsoniter.Unmarshal([]byte(file), &data) if err != nil { log.Error("Cannot parse config file: ", config_file) // NOSONAR log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } var newTypeNames []string for i := 0; i < len(data.ProdDataTypes); i++ { t1 := make(map[string]interface{}) t2 := make(map[string]interface{}) t2["schema"] = "http://json-schema.org/draft-07/schema#" t2["title"] = data.ProdDataTypes[i].ID t2["description"] = data.ProdDataTypes[i].ID t2["type"] = "object" t1["info_job_data_schema"] = t2 json, err := jsoniter.Marshal(t1) if err != nil { log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) // NOSONAR log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } else { ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") if !ok { log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) // NOSONAR log.Error(registeringProducer, producer_instance_name, failedMessageLabel) return false } newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID) } } log.Debug("Registering types: ", newTypeNames) datalock.Lock() defer datalock.Unlock() for _, v := range data.ProdDataTypes { log.Info("Adding type job for type: ", v.ID, " Type added to configuration") startTypeJob(v) } dataTypes.InfoTypes = data log.Debug("Datatypes: ", dataTypes.InfoTypes) log.Info(registeringProducer, producer_instance_name, " - OK") return true } func startTypeJob(dp dataTypes.DataType) { log.Info("Starting type job: ", dp.ID) jobRecord := dataTypes.TypeJobRecord{} jobRecord.Job_control = make(chan dataTypes.JobControl, 1) jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1) jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length) jobRecord.InfoType = dp.ID jobRecord.InputTopic = dp.KafkaInputTopic jobRecord.GroupId = "kafka-procon-" + dp.ID jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP") switch dp.ID { case "xml-file-data-to-filestore": go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json") case "xml-file-data": go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "") default: } go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId) dataTypes.TypeJobs[dp.ID] = jobRecord log.Debug("Type job input type: ", dp.InputJobType) }