3 // ========================LICENSE_START=================================
6 // Copyright (C) 2023: Nordix Foundation
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
12 // http://www.apache.org/licenses/LICENSE-2.0
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 // ========================LICENSE_END===================================
24 jsoniter "github.com/json-iterator/go"
25 log "github.com/sirupsen/logrus"
26 "main/common/dataTypes"
28 "main/components/kafkacollector"
38 var ics_server = os.Getenv("ICS")
39 var self = os.Getenv("SELF")
41 // This are optional - set if using SASL protocol is used towards kafka
42 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
44 var bootstrapserver = os.Getenv("KAFKA_SERVER")
46 const config_file = "application_configuration.json"
47 const producer_name = "kafka-producer"
49 var producer_instance_name string = producer_name
51 const reader_queue_length = 100 //Per type job
52 const writer_queue_length = 100 //Per info job
54 var files_volume = os.Getenv("FILES_VOLUME")
56 var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
57 var writer_control = make(chan dataTypes.WriterControl, 1)
59 const registration_delay_short = 2
60 const registration_delay_long = 120
66 // Lock for all internal data
67 var datalock sync.Mutex
70 Init dataTypes.AppStates = iota
78 //log.SetLevel(log.InfoLevel)
79 log.SetLevel(log.TraceLevel)
81 log.Info("Server starting...")
84 log.Panic("Env SELF not configured")
86 if bootstrapserver == "" {
87 log.Panic("Env KAFKA_SERVER not set")
90 log.Panic("Env ICS not set")
92 if os.Getenv("KP") != "" {
93 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
96 go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
98 //Setup proc for periodic type registration
99 var event_chan = make(chan int) //Channel for stopping the proc
100 go periodic_registration(event_chan)
102 //Wait for term/int signal do try to shut down gracefully
103 sigs := make(chan os.Signal, 1)
104 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
107 fmt.Printf("Received signal %s - application will terminate\n", sig)
108 event_chan <- 0 // Stop periodic registration
110 defer datalock.Unlock()
111 AppState = Terminating
116 //Wait until all go routines has exited
119 fmt.Println("main routine exit")
120 fmt.Println("server stopped")
123 // == Core functions ==//
124 // Run periodic registration of producers
125 func periodic_registration(evtch chan int) {
130 if msg == 0 { // Stop thread
133 case <-time.After(time.Duration(delay) * time.Second):
134 ok := register_producer()
136 delay = registration_delay_long
138 if delay < registration_delay_long {
139 delay += registration_delay_short
141 delay = registration_delay_short
148 func register_producer() bool {
150 log.Info("Registering producer: ", producer_instance_name)
152 file, err := os.ReadFile(config_file)
154 log.Error("Cannot read config file: ", config_file)
155 log.Error("Registering producer: ", producer_instance_name, " - failed")
158 data := dataTypes.DataTypes{}
159 err = jsoniter.Unmarshal([]byte(file), &data)
161 log.Error("Cannot parse config file: ", config_file)
162 log.Error("Registering producer: ", producer_instance_name, " - failed")
165 var new_type_names []string
167 for i := 0; i < len(data.ProdDataTypes); i++ {
168 t1 := make(map[string]interface{})
169 t2 := make(map[string]interface{})
171 t2["schema"] = "http://json-schema.org/draft-07/schema#"
172 t2["title"] = data.ProdDataTypes[i].ID
173 t2["description"] = data.ProdDataTypes[i].ID
174 t2["type"] = "object"
176 t1["info_job_data_schema"] = t2
178 json, err := jsoniter.Marshal(t1)
180 log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
181 log.Error("Registering producer: ", producer_instance_name, " - failed")
184 ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
186 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
187 log.Error("Registering producer: ", producer_instance_name, " - failed")
190 new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
195 log.Debug("Registering types: ", new_type_names)
197 defer datalock.Unlock()
199 for _, v := range data.ProdDataTypes {
200 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
204 dataTypes.InfoTypes = data
205 log.Debug("Datatypes: ", dataTypes.InfoTypes)
206 log.Info("Registering producer: ", producer_instance_name, " - OK")
210 func start_type_job(dp dataTypes.DataType) {
211 log.Info("Starting type job: ", dp.ID)
212 job_record := dataTypes.TypeJobRecord{}
214 job_record.Job_control = make(chan dataTypes.JobControl, 1)
215 job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
216 job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
217 job_record.InfoType = dp.ID
218 job_record.InputTopic = dp.KafkaInputTopic
219 job_record.GroupId = "kafka-procon-" + dp.ID
220 job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
223 case "xml-file-data-to-filestore":
224 go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
225 case "xml-file-data":
226 go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
230 go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
232 dataTypes.TypeJobs[dp.ID] = job_record
233 log.Debug("Type job input type: ", dp.InputJobType)