1 // ============LICENSE_START===============================================
2 // Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 // ========================================================================
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 // ============LICENSE_END=================================================
43 "github.com/google/uuid"
44 "golang.org/x/oauth2/clientcredentials"
46 log "github.com/sirupsen/logrus"
48 "github.com/gorilla/mux"
52 "github.com/confluentinc/confluent-kafka-go/kafka"
53 influxdb2 "github.com/influxdata/influxdb-client-go/v2"
54 jsoniter "github.com/json-iterator/go"
55 "github.com/minio/minio-go/v7"
56 "github.com/minio/minio-go/v7/pkg/credentials"
62 const https_port = 443
63 const config_file = "application_configuration.json"
64 const server_crt = "server.crt"
65 const server_key = "server.key"
67 const producer_name = "kafka-producer"
69 const registration_delay_short = 2
70 const registration_delay_long = 120
80 const reader_queue_length = 100 //Per type job
81 const writer_queue_length = 100 //Per info job
82 const parallelism_limiter = 100 //For all jobs
84 // This are optional - set if using SASL protocol is used towards kafka
85 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
86 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
87 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
88 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
94 type FilterParameters struct {
95 MeasuredEntityDns []string `json:"measuredEntityDns"`
96 MeasTypes []string `json:"measTypes"`
97 MeasObjClass []string `json:"measObjClass"`
98 MeasObjInstIds []string `json:"measObjInstIds"`
101 type InfoJobDataType struct {
103 KafkaOutputTopic string `json:"kafkaOutputTopic"`
105 DbUrl string `json:"db-url"`
106 DbOrg string `json:"db-org"`
107 DbBucket string `json:"db-bucket"`
108 DbToken string `json:"db-token"`
110 FilterParams FilterParameters `json:"filter"`
111 } `json:"info_job_data"`
112 InfoJobIdentity string `json:"info_job_identity"`
113 InfoTypeIdentity string `json:"info_type_identity"`
114 LastUpdated string `json:"last_updated"`
115 Owner string `json:"owner"`
116 TargetURI string `json:"target_uri"`
119 // Type for an infojob
120 type InfoJobRecord struct {
121 job_info InfoJobDataType
124 statistics *InfoJobStats
127 // Type for an infojob
128 type TypeJobRecord struct {
131 data_in_channel chan *KafkaPayload
132 reader_control chan ReaderControl
133 job_control chan JobControl
137 statistics *TypeJobStats
140 // Type for controlling the topic reader
141 type ReaderControl struct {
145 // Type for controlling the topic writer
146 type WriterControl struct {
150 // Type for controlling the job
151 type JobControl struct {
156 type KafkaPayload struct {
162 type FilterMaps struct {
163 sourceNameMap map[string]bool
164 measObjClassMap map[string]bool
165 measObjInstIdsMap map[string]bool
166 measTypesMap map[string]bool
169 type InfluxJobParameters struct {
181 influxParameters InfluxJobParameters
184 // Type for info job statistics
185 type InfoJobStats struct {
190 // Type for type job statistics
191 type TypeJobStats struct {
196 // == API Datatypes ==//
197 // Type for supported data types
198 type DataType struct {
199 ID string `json:"id"`
200 KafkaInputTopic string `json:"kafkaInputTopic"`
201 InputJobType string `json:inputJobType`
202 InputJobDefinition struct {
203 KafkaOutputTopic string `json:kafkaOutputTopic`
204 } `json:inputJobDefinition`
211 type DataTypes struct {
212 ProdDataTypes []DataType `json:"types"`
215 type Minio_buckets struct {
216 Buckets map[string]bool
219 //== External data types ==//
221 // // Data type for event xml file download
222 type XmlFileEventHeader struct {
223 ProductName string `json:"productName"`
224 VendorName string `json:"vendorName"`
225 Location string `json:"location"`
226 Compression string `json:"compression"`
227 SourceName string `json:"sourceName"`
228 FileFormatType string `json:"fileFormatType"`
229 FileFormatVersion string `json:"fileFormatVersion"`
230 StartEpochMicrosec int64 `json:"startEpochMicrosec"`
231 LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
232 Name string `json:"name"`
233 ChangeIdentifier string `json:"changeIdentifier"`
234 InternalLocation string `json:"internalLocation"`
235 TimeZoneOffset string `json:"timeZoneOffset"`
236 //ObjectStoreBucket string `json:"objectStoreBucket"`
239 // Data types for input xml file
240 type MeasCollecFile struct {
241 XMLName xml.Name `xml:"measCollecFile"`
242 Text string `xml:",chardata"`
243 Xmlns string `xml:"xmlns,attr"`
244 Xsi string `xml:"xsi,attr"`
245 SchemaLocation string `xml:"schemaLocation,attr"`
247 Text string `xml:",chardata"`
248 FileFormatVersion string `xml:"fileFormatVersion,attr"`
249 VendorName string `xml:"vendorName,attr"`
250 DnPrefix string `xml:"dnPrefix,attr"`
252 Text string `xml:",chardata"`
253 LocalDn string `xml:"localDn,attr"`
254 ElementType string `xml:"elementType,attr"`
257 Text string `xml:",chardata"`
258 BeginTime string `xml:"beginTime,attr"`
262 Text string `xml:",chardata"`
263 ManagedElement struct {
264 Text string `xml:",chardata"`
265 LocalDn string `xml:"localDn,attr"`
266 SwVersion string `xml:"swVersion,attr"`
267 } `xml:"managedElement"`
269 Text string `xml:",chardata"`
270 MeasInfoId string `xml:"measInfoId,attr"`
272 Text string `xml:",chardata"`
273 JobId string `xml:"jobId,attr"`
276 Text string `xml:",chardata"`
277 Duration string `xml:"duration,attr"`
278 EndTime string `xml:"endTime,attr"`
281 Text string `xml:",chardata"`
282 Duration string `xml:"duration,attr"`
285 Text string `xml:",chardata"`
286 P string `xml:"p,attr"`
289 Text string `xml:",chardata"`
290 MeasObjLdn string `xml:"measObjLdn,attr"`
292 Text string `xml:",chardata"`
293 P string `xml:"p,attr"`
295 Suspect string `xml:"suspect"`
300 Text string `xml:",chardata"`
302 Text string `xml:",chardata"`
303 EndTime string `xml:"endTime,attr"`
308 // Data type for json file
309 // Splitted in sevreal part to allow add/remove in lists
310 type MeasResults struct {
312 SValue string `json:"sValue"`
315 type MeasValues struct {
316 MeasObjInstID string `json:"measObjInstId"`
317 SuspectFlag string `json:"suspectFlag"`
318 MeasResultsList []MeasResults `json:"measResults"`
321 type SMeasTypes struct {
322 SMeasType string `json:"sMeasTypesList"`
325 type MeasInfoList struct {
327 SMeasInfoID string `json:"sMeasInfoId"`
328 } `json:"measInfoId"`
330 SMeasTypesList []string `json:"sMeasTypesList"`
332 MeasValuesList []MeasValues `json:"measValuesList"`
335 type PMJsonFile struct {
337 CommonEventHeader struct {
338 Domain string `json:"domain"`
339 EventID string `json:"eventId"`
340 Sequence int `json:"sequence"`
341 EventName string `json:"eventName"`
342 SourceName string `json:"sourceName"`
343 ReportingEntityName string `json:"reportingEntityName"`
344 Priority string `json:"priority"`
345 StartEpochMicrosec int64 `json:"startEpochMicrosec"`
346 LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
347 Version string `json:"version"`
348 VesEventListenerVersion string `json:"vesEventListenerVersion"`
349 TimeZoneOffset string `json:"timeZoneOffset"`
350 } `json:"commonEventHeader"`
351 Perf3GppFields struct {
352 Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
353 MeasDataCollection struct {
354 GranularityPeriod int `json:"granularityPeriod"`
355 MeasuredEntityUserName string `json:"measuredEntityUserName"`
356 MeasuredEntityDn string `json:"measuredEntityDn"`
357 MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"`
358 SMeasInfoList []MeasInfoList `json:"measInfoList"`
359 } `json:"measDataCollection"`
360 } `json:"perf3gppFields"`
364 // Data type for converted json file message
365 type FileDownloadedEvt struct {
366 Filename string `json:"filename"`
373 // Lock for all internal data
374 var datalock sync.Mutex
376 var producer_instance_name string = producer_name
378 // Keep all info type jobs, key == type id
379 var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
381 // Keep all info jobs, key == job id
382 var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
384 var InfoTypes DataTypes
386 // Limiter - valid for all jobs
387 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
389 // TODO: Config param?
390 var bucket_location = "swe"
392 var httpclient = &http.Client{}
394 // == Env variables ==//
395 var bootstrapserver = os.Getenv("KAFKA_SERVER")
396 var files_volume = os.Getenv("FILES_VOLUME")
397 var ics_server = os.Getenv("ICS")
398 var self = os.Getenv("SELF")
399 var filestore_user = os.Getenv("FILESTORE_USER")
400 var filestore_pwd = os.Getenv("FILESTORE_PWD")
401 var filestore_server = os.Getenv("FILESTORE_SERVER")
403 var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
404 var writer_control = make(chan WriterControl, 1)
406 var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
411 //log.SetLevel(log.InfoLevel)
412 log.SetLevel(log.TraceLevel)
414 log.Info("Server starting...")
417 log.Panic("Env SELF not configured")
419 if bootstrapserver == "" {
420 log.Panic("Env KAFKA_SERVER not set")
422 if ics_server == "" {
423 log.Panic("Env ICS not set")
425 if os.Getenv("KP") != "" {
426 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
429 rtr := mux.NewRouter()
430 rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
431 rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
432 rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
433 rtr.HandleFunc("/statistics", statistics)
434 rtr.HandleFunc("/logging/{level}", logging_level)
435 rtr.HandleFunc("/logging", logging_level)
436 rtr.HandleFunc("/", alive)
438 //For perf/mem profiling
439 rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
441 http.Handle("/", rtr)
443 http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
445 cer, err := tls.LoadX509KeyPair(server_crt, server_key)
447 log.Error("Cannot load key and cert - %v\n", err)
450 config := &tls.Config{Certificates: []tls.Certificate{cer}}
451 https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
453 //TODO: Make http on/off configurable
456 log.Info("Starting http service...")
457 err := http_server.ListenAndServe()
458 if err == http.ErrServerClosed { // graceful shutdown
459 log.Info("http server shutdown...")
460 } else if err != nil {
461 log.Error("http server error: %v\n", err)
465 //TODO: Make https on/off configurable
468 log.Info("Starting https service...")
469 err := https_server.ListenAndServe()
470 if err == http.ErrServerClosed { // graceful shutdown
471 log.Info("https server shutdown...")
472 } else if err != nil {
473 log.Error("https server error: %v\n", err)
476 check_tcp(strconv.Itoa(http_port))
477 check_tcp(strconv.Itoa(https_port))
479 go start_topic_writer(writer_control, data_out_channel)
481 //Setup proc for periodic type registration
482 var event_chan = make(chan int) //Channel for stopping the proc
483 go periodic_registration(event_chan)
485 //Wait for term/int signal do try to shut down gracefully
486 sigs := make(chan os.Signal, 1)
487 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
490 fmt.Printf("Received signal %s - application will terminate\n", sig)
491 event_chan <- 0 // Stop periodic registration
493 defer datalock.Unlock()
494 AppState = Terminating
495 http_server.Shutdown(context.Background())
496 https_server.Shutdown(context.Background())
498 for key, _ := range TypeJobs {
499 log.Info("Stopping type job:", key)
500 for _, dp := range InfoTypes.ProdDataTypes {
510 //Wait until all go routines has exited
513 fmt.Println("main routine exit")
514 fmt.Println("server stopped")
517 func check_tcp(port string) {
518 log.Info("Checking tcp port: ", port)
520 address := net.JoinHostPort("localhost", port)
522 conn, err := net.DialTimeout("tcp", address, 3*time.Second)
524 log.Info("Checking tcp port: ", port, " failed, retrying...")
527 log.Info("Checking tcp port: ", port, " - OK")
531 log.Info("Checking tcp port: ", port, " failed, retrying...")
537 //== Core functions ==//
539 // Run periodic registration of producers
540 func periodic_registration(evtch chan int) {
545 if msg == 0 { // Stop thread
548 case <-time.After(time.Duration(delay) * time.Second):
549 ok := register_producer()
551 delay = registration_delay_long
553 if delay < registration_delay_long {
554 delay += registration_delay_short
556 delay = registration_delay_short
563 func register_producer() bool {
565 log.Info("Registering producer: ", producer_instance_name)
567 file, err := os.ReadFile(config_file)
569 log.Error("Cannot read config file: ", config_file)
570 log.Error("Registering producer: ", producer_instance_name, " - failed")
574 err = jsoniter.Unmarshal([]byte(file), &data)
576 log.Error("Cannot parse config file: ", config_file)
577 log.Error("Registering producer: ", producer_instance_name, " - failed")
580 var new_type_names []string
582 for i := 0; i < len(data.ProdDataTypes); i++ {
583 t1 := make(map[string]interface{})
584 t2 := make(map[string]interface{})
586 t2["schema"] = "http://json-schema.org/draft-07/schema#"
587 t2["title"] = data.ProdDataTypes[i].ID
588 t2["description"] = data.ProdDataTypes[i].ID
589 t2["type"] = "object"
591 t1["info_job_data_schema"] = t2
593 json, err := json.Marshal(t1)
595 log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
596 log.Error("Registering producer: ", producer_instance_name, " - failed")
599 //TODO: http/https should be configurable
600 ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
602 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
603 log.Error("Registering producer: ", producer_instance_name, " - failed")
606 new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
611 log.Debug("Registering types: ", new_type_names)
612 m := make(map[string]interface{})
613 m["supported_info_types"] = new_type_names
614 //TODO: http/https should be configurable
615 m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
616 //TODO: http/https should be configurable
617 m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
619 json, err := json.Marshal(m)
621 log.Error("Cannot create json for producer: ", producer_instance_name)
622 log.Error("Registering producer: ", producer_instance_name, " - failed")
625 ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
627 log.Error("Cannot register producer: ", producer_instance_name)
628 log.Error("Registering producer: ", producer_instance_name, " - failed")
632 defer datalock.Unlock()
634 var current_type_names []string
635 for _, v := range InfoTypes.ProdDataTypes {
636 current_type_names = append(current_type_names, v.ID)
637 if contains_str(new_type_names, v.ID) {
639 log.Debug("Type ", v.ID, " exists")
643 log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
648 for _, v := range data.ProdDataTypes {
649 if contains_str(current_type_names, v.ID) {
651 log.Debug("Type ", v.ID, " exists")
655 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
661 log.Debug("Datatypes: ", InfoTypes)
663 log.Info("Registering producer: ", producer_instance_name, " - OK")
667 func remove_type_job(dp DataType) {
668 log.Info("Removing type job: ", dp.ID)
669 j, ok := TypeJobs[dp.ID]
671 j.reader_control <- ReaderControl{"EXIT"}
674 if dp.ext_job_created == true {
675 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
676 //TODO: http/https should be configurable
677 ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
679 log.Error("Cannot delete job: ", dp.ext_job_id)
681 dp.ext_job_created = false
687 func start_type_job(dp DataType) {
688 log.Info("Starting type job: ", dp.ID)
689 job_record := TypeJobRecord{}
691 job_record.job_control = make(chan JobControl, 1)
692 job_record.reader_control = make(chan ReaderControl, 1)
693 job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
694 job_record.InfoType = dp.ID
695 job_record.InputTopic = dp.KafkaInputTopic
696 job_record.groupId = "kafka-procon-" + dp.ID
697 job_record.clientId = dp.ID + "-" + os.Getenv("KP")
698 var stats TypeJobStats
699 job_record.statistics = &stats
702 case "xml-file-data-to-filestore":
703 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
704 case "xml-file-data":
705 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
706 case "json-file-data-from-filestore":
707 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
708 case "json-file-data":
709 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
710 case "json-file-data-from-filestore-to-influx":
711 go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
712 // case "json-data-to-influx":
713 // go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel)
718 go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
720 TypeJobs[dp.ID] = job_record
721 log.Debug("Type job input type: ", dp.InputJobType)
725 func create_ext_job(dp DataType) {
726 if dp.InputJobType != "" {
727 jb := make(map[string]interface{})
728 jb["info_type_id"] = dp.InputJobType
729 jb["job_owner"] = "console" //TODO:
730 jb["status_notification_uri"] = "http://callback:80/post"
731 jb1 := make(map[string]interface{})
732 jb["job_definition"] = jb1
733 jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
735 json, err := json.Marshal(jb)
736 dp.ext_job_created = false
739 log.Error("Cannot create json for type: ", dp.InputJobType)
743 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
744 //TODO: http/https should be configurable
747 ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
749 log.Error("Cannot register job: ", dp.InputJobType)
750 //TODO: Restart after long time?
753 log.Debug("Registered job ok: ", dp.InputJobType)
754 dp.ext_job_created = true
759 func remove_info_job(jobid string) {
760 log.Info("Removing info job: ", jobid)
764 jc.command = "REMOVE-FILTER"
766 infoJob := InfoJobs[jobid]
767 typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
768 typeJob.job_control <- jc
769 delete(InfoJobs, jobid)
773 // == Helper functions ==//
775 // Function to check the status of a mutex lock
776 func MutexLocked(m *sync.Mutex) bool {
777 state := reflect.ValueOf(m).Elem().FieldByName("state")
778 return state.Int()&mutexLocked == mutexLocked
781 // Test if slice contains a string
782 func contains_str(s []string, e string) bool {
783 for _, a := range s {
791 // Send a http request with json (json may be nil)
792 func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
794 // set the HTTP method, url, and request body
795 var req *http.Request
798 req, err = http.NewRequest(method, url, http.NoBody)
800 req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
801 req.Header.Set("Content-Type", "application/json; charset=utf-8")
804 log.Error("Cannot create http request, method: ", method, " url: ", url)
809 token, err := fetch_token()
811 log.Error("Cannot fetch token for http request: ", err)
814 req.Header.Set("Authorization", "Bearer "+token.TokenValue)
817 log.Debug("HTTP request: ", req)
819 log.Debug("Sending http request")
820 resp, err2 := httpclient.Do(req)
822 log.Error("Http request error: ", err2)
823 log.Error("Cannot send http request method: ", method, " url: ", url)
825 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
826 log.Debug("Accepted http status: ", resp.StatusCode)
830 log.Debug("HTTP resp: ", resp)
836 // // Send a http request with json (json may be nil)
837 // func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
839 // // set the HTTP method, url, and request body
840 // var req *http.Request
843 // req, err = http.NewRequest(method, url, http.NoBody)
845 // req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
846 // req.Header.Set("Content-Type", "application/json; charset=utf-8")
849 // log.Error("Cannot create http request, method: ", method, " url: ", url)
854 // token, err := fetch_token()
856 // log.Error("Cannot fetch token for http request: ", err)
859 // req.Header.Set("Authorization", "Bearer "+token.TokenValue)
862 // log.Debug("HTTP request: ", req)
869 // for i := retries; i > 0; i-- {
870 // log.Debug("Sending http request")
871 // resp, err2 := httpclient.Do(req)
873 // log.Error("Http request error: ", err2)
874 // log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1)
876 // time.Sleep(time.Duration(sleep_time) * time.Second)
877 // sleep_time = 2 * sleep_time
879 // if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
880 // log.Debug("Accepted http status: ", resp.StatusCode)
884 // log.Debug("HTTP resp: ", resp)
891 // // Send a http request with json (json may be nil)
892 // func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
893 // // initialize http client
894 // client := &http.Client{}
896 // // set the HTTP method, url, and request body
897 // var req *http.Request
900 // req, err = http.NewRequest(method, url, http.NoBody)
902 // req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
903 // req.Header.Set("Content-Type", "application/json; charset=utf-8")
906 // log.Error("Cannot create http request method: ", method, " url: ", url)
912 // token, err := fetch_token()
914 // log.Error("Cannot fetch token for http request: ", err)
917 // req.Header.Add("Authorization", "Bearer "+token.TokenValue)
919 // log.Debug("HTTP request: ", req)
921 // b, berr := io.ReadAll(req.Body)
923 // log.Debug("HTTP request body length: ", len(b))
925 // log.Debug("HTTP request - cannot check body length: ", berr)
928 // log.Debug("HTTP request null json")
930 // log.Debug("HTTP request json: ", string(json))
932 // requestDump, cerr := httputil.DumpRequestOut(req, true)
936 // fmt.Println(string(requestDump))
943 // for i := retries; i > 0; i-- {
944 // resp, err2 := client.Do(req)
946 // log.Error("Http request error: ", err2)
947 // log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i)
949 // time.Sleep(time.Duration(sleep_time) * time.Second)
950 // sleep_time = 2 * sleep_time
952 // if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
953 // log.Debug("Accepted http status: ", resp.StatusCode)
954 // defer resp.Body.Close()
962 func fetch_token() (*kafka.OAuthBearerToken, error) {
963 log.Debug("Get token inline")
964 conf := &clientcredentials.Config{
965 ClientID: creds_client_id,
966 ClientSecret: creds_client_secret,
967 TokenURL: creds_service_url,
969 token, err := conf.Token(context.Background())
971 log.Warning("Cannot fetch access token: ", err)
974 extensions := map[string]string{}
975 log.Debug("=====================================================")
976 log.Debug("token: ", token)
977 log.Debug("=====================================================")
978 log.Debug("TokenValue: ", token.AccessToken)
979 log.Debug("=====================================================")
980 log.Debug("Expiration: ", token.Expiry)
982 // t := token.Expiry.Add(-time.Minute)
983 // log.Debug("Modified expiration: ", t)
984 oauthBearerToken := kafka.OAuthBearerToken{
985 TokenValue: token.AccessToken,
987 Extensions: extensions,
990 return &oauthBearerToken, nil
993 // Function to print memory details
994 // https://pkg.go.dev/runtime#MemStats
995 func PrintMemUsage() {
996 if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
997 var m runtime.MemStats
998 runtime.ReadMemStats(&m)
999 fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
1000 fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
1001 fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
1002 fmt.Printf("\tNumGC = %v\n", m.NumGC)
1003 fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
1004 fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
1005 fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
1006 fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
1007 fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
1008 fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
1009 fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
1013 func bToMb(b uint64) uint64 {
1014 return b / 1024 / 1024
1017 func generate_uuid_from_type(s string) string {
1026 uuid, _ := uuid.FromBytes(b)
1027 return uuid.String()
1030 // Write gzipped data to a Writer
1031 func gzipWrite(w io.Writer, data *[]byte) error {
1032 gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
1038 _, err2 := gw.Write(*data)
1042 // Write gunzipped data from Reader to a Writer
1043 func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
1044 gr, err1 := gzip.NewReader(data)
1050 data2, err2 := io.ReadAll(gr)
1054 _, err3 := w.Write(data2)
1061 func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
1062 tctx := context.Background()
1063 err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
1065 // Check to see if we already own this bucket (which happens if you run this twice)
1066 exists, errBucketExists := mc.BucketExists(tctx, bucket)
1067 if errBucketExists == nil && exists {
1068 log.Debug("Already own bucket:", bucket)
1069 add_bucket(client_id, bucket)
1072 log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
1076 log.Debug("Successfully created bucket: ", bucket)
1077 add_bucket(client_id, bucket)
1081 func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
1082 ok := bucket_exist(client_id, bucket)
1086 tctx := context.Background()
1087 exists, err := mc.BucketExists(tctx, bucket)
1088 if err == nil && exists {
1089 log.Debug("Already own bucket:", bucket)
1092 log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
1096 func add_bucket(minio_id string, bucket string) {
1098 defer datalock.Unlock()
1100 b, ok := minio_bucketlist[minio_id]
1103 b.Buckets = make(map[string]bool)
1105 b.Buckets[bucket] = true
1106 minio_bucketlist[minio_id] = b
1109 func bucket_exist(minio_id string, bucket string) bool {
1111 defer datalock.Unlock()
1113 b, ok := minio_bucketlist[minio_id]
1117 _, ok = b.Buckets[bucket]
1121 //== http api functions ==//
1123 // create/update job
1124 func create_job(w http.ResponseWriter, req *http.Request) {
1125 log.Debug("Create job, http method: ", req.Method)
1126 if req.Method != http.MethodPost {
1127 log.Error("Create job, http method not allowed")
1128 w.WriteHeader(http.StatusMethodNotAllowed)
1131 ct := req.Header.Get("Content-Type")
1132 if ct != "application/json" {
1133 log.Error("Create job, bad content type")
1134 http.Error(w, "Bad content type", http.StatusBadRequest)
1138 var t InfoJobDataType
1139 err := json.NewDecoder(req.Body).Decode(&t)
1141 log.Error("Create job, cannot parse json,", err)
1142 http.Error(w, "Cannot parse json", http.StatusBadRequest)
1145 log.Debug("Creating job, id: ", t.InfoJobIdentity)
1147 defer datalock.Unlock()
1149 job_id := t.InfoJobIdentity
1150 job_record, job_found := InfoJobs[job_id]
1151 type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
1154 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1155 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1158 } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
1159 log.Error("Job cannot change type")
1160 http.Error(w, "Job cannot change type", http.StatusBadRequest)
1162 } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
1163 log.Error("Job cannot change topic")
1164 http.Error(w, "Job cannot change topic", http.StatusBadRequest)
1166 } else if !found_type {
1167 //Should never happen, if the type is removed then job is stopped
1168 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1169 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1173 //TODO: Verify that job contains enough parameters...
1176 job_record = InfoJobRecord{}
1177 job_record.job_info = t
1178 output_topic := t.InfoJobData.KafkaOutputTopic
1179 job_record.output_topic = t.InfoJobData.KafkaOutputTopic
1180 log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
1182 var stats InfoJobStats
1183 job_record.statistics = &stats
1186 filter.JobId = job_id
1187 filter.OutputTopic = job_record.output_topic
1191 jc.command = "ADD-FILTER"
1194 if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1196 fm.sourceNameMap = make(map[string]bool)
1197 fm.measObjClassMap = make(map[string]bool)
1198 fm.measObjInstIdsMap = make(map[string]bool)
1199 fm.measTypesMap = make(map[string]bool)
1200 if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
1201 for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
1202 fm.sourceNameMap[v] = true
1205 if t.InfoJobData.FilterParams.MeasObjClass != nil {
1206 for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
1207 fm.measObjClassMap[v] = true
1210 if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
1211 for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
1212 fm.measObjInstIdsMap[v] = true
1215 if t.InfoJobData.FilterParams.MeasTypes != nil {
1216 for _, v := range t.InfoJobData.FilterParams.MeasTypes {
1217 fm.measTypesMap[v] = true
1222 if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1223 influxparam := InfluxJobParameters{}
1224 influxparam.DbUrl = t.InfoJobData.DbUrl
1225 influxparam.DbOrg = t.InfoJobData.DbOrg
1226 influxparam.DbBucket = t.InfoJobData.DbBucket
1227 influxparam.DbToken = t.InfoJobData.DbToken
1228 filter.influxParameters = influxparam
1232 InfoJobs[job_id] = job_record
1234 type_job_record.job_control <- jc
1243 func delete_job(w http.ResponseWriter, req *http.Request) {
1244 if req.Method != http.MethodDelete {
1245 w.WriteHeader(http.StatusMethodNotAllowed)
1249 defer datalock.Unlock()
1251 vars := mux.Vars(req)
1253 if id, ok := vars["job_id"]; ok {
1254 if _, ok := InfoJobs[id]; ok {
1256 w.WriteHeader(http.StatusNoContent)
1257 log.Info("Job ", id, " deleted")
1261 w.WriteHeader(http.StatusNotFound)
1265 func supervise_job(w http.ResponseWriter, req *http.Request) {
1266 if req.Method != http.MethodGet {
1267 w.WriteHeader(http.StatusMethodNotAllowed)
1271 defer datalock.Unlock()
1273 vars := mux.Vars(req)
1275 log.Debug("Supervising, job: ", vars["job_id"])
1276 if id, ok := vars["job_id"]; ok {
1277 if _, ok := InfoJobs[id]; ok {
1278 log.Debug("Supervision ok, job", id)
1282 w.WriteHeader(http.StatusNotFound)
1285 // producer supervision
1286 func supervise_producer(w http.ResponseWriter, req *http.Request) {
1287 if req.Method != http.MethodGet {
1288 w.WriteHeader(http.StatusMethodNotAllowed)
1292 w.WriteHeader(http.StatusOK)
1295 // producer statictics, all jobs
1296 func statistics(w http.ResponseWriter, req *http.Request) {
1297 if req.Method != http.MethodGet {
1298 w.WriteHeader(http.StatusMethodNotAllowed)
1301 m := make(map[string]interface{})
1302 log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
1304 defer datalock.Unlock()
1305 req.Header.Set("Content-Type", "application/json; charset=utf-8")
1306 m["number-of-jobs"] = len(InfoJobs)
1307 m["number-of-types"] = len(InfoTypes.ProdDataTypes)
1308 qm := make(map[string]interface{})
1310 for key, elem := range InfoJobs {
1311 jm := make(map[string]interface{})
1313 jm["type"] = elem.job_info.InfoTypeIdentity
1314 typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
1315 jm["groupId"] = typeJob.groupId
1316 jm["clientID"] = typeJob.clientId
1317 jm["input topic"] = typeJob.InputTopic
1318 jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
1319 jm["output topic"] = elem.output_topic
1320 jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
1321 jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
1322 jm["msg_out (job)"] = elem.statistics.out_msg_cnt
1325 json, err := json.Marshal(m)
1327 w.WriteHeader(http.StatusInternalServerError)
1328 log.Error("Cannot marshal statistics json")
1331 _, err = w.Write(json)
1333 w.WriteHeader(http.StatusInternalServerError)
1334 log.Error("Cannot send statistics json")
1339 // Simple alive check
1340 func alive(w http.ResponseWriter, req *http.Request) {
1344 // Get/Set logging level
1345 func logging_level(w http.ResponseWriter, req *http.Request) {
1346 vars := mux.Vars(req)
1347 if level, ok := vars["level"]; ok {
1348 if req.Method == http.MethodPut {
1351 log.SetLevel(log.TraceLevel)
1353 log.SetLevel(log.DebugLevel)
1355 log.SetLevel(log.InfoLevel)
1357 log.SetLevel(log.WarnLevel)
1359 log.SetLevel(log.ErrorLevel)
1361 log.SetLevel(log.FatalLevel)
1363 log.SetLevel(log.PanicLevel)
1365 w.WriteHeader(http.StatusNotFound)
1368 w.WriteHeader(http.StatusMethodNotAllowed)
1371 if req.Method == http.MethodGet {
1373 if log.IsLevelEnabled(log.PanicLevel) {
1375 } else if log.IsLevelEnabled(log.FatalLevel) {
1377 } else if log.IsLevelEnabled(log.ErrorLevel) {
1379 } else if log.IsLevelEnabled(log.WarnLevel) {
1381 } else if log.IsLevelEnabled(log.InfoLevel) {
1383 } else if log.IsLevelEnabled(log.DebugLevel) {
1385 } else if log.IsLevelEnabled(log.TraceLevel) {
1388 w.Header().Set("Content-Type", "application/text")
1389 w.Write([]byte(msg))
1391 w.WriteHeader(http.StatusMethodNotAllowed)
1396 func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
1398 log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
1401 var c *kafka.Consumer = nil
1404 for topic_ok == false {
1407 case reader_ctrl := <-control_ch:
1408 if reader_ctrl.command == "EXIT" {
1409 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1410 //TODO: Stop consumer if present?
1411 data_ch <- nil //Signal to job handler
1415 case <-time.After(1 * time.Second):
1420 c = create_kafka_consumer(type_id, gid, cid)
1422 log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
1424 log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
1427 if c != nil && topic_ok == false {
1428 err := c.SubscribeTopics([]string{topic}, nil)
1430 log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
1432 log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
1438 log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
1440 var event_chan = make(chan int)
1444 case evt := <-c.Events():
1446 case kafka.OAuthBearerTokenRefresh:
1447 log.Debug("New consumer token needed: ", evt)
1448 token, err := fetch_token()
1450 log.Warning("Cannot cannot fetch token: ", err)
1451 c.SetOAuthBearerTokenFailure(err.Error())
1453 setTokenError := c.SetOAuthBearerToken(*token)
1454 if setTokenError != nil {
1455 log.Warning("Cannot cannot set token: ", setTokenError)
1456 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1460 //TODO: Handle these?
1461 log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
1464 case msg := <-event_chan:
1468 case <-time.After(1 * time.Second):
1478 //maxDur := 1 * time.Second
1481 case reader_ctrl := <-control_ch:
1482 if reader_ctrl.command == "EXIT" {
1484 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1485 data_ch <- nil //Signal to job handler
1493 log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
1496 switch e := ev.(type) {
1497 case *kafka.Message:
1498 var kmsg KafkaPayload
1501 c.Commit() //TODO: Ok?
1503 //TODO: Check for exception
1506 log.Debug("Reader msg: ", &kmsg)
1507 log.Debug("Reader - data_ch ", data_ch)
1509 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
1511 case kafka.OAuthBearerTokenRefresh:
1512 log.Debug("New consumer token needed: ", ev)
1513 token, err := fetch_token()
1515 log.Warning("Cannot cannot fetch token: ", err)
1516 c.SetOAuthBearerTokenFailure(err.Error())
1518 setTokenError := c.SetOAuthBearerToken(*token)
1519 if setTokenError != nil {
1520 log.Warning("Cannot cannot set token: ", setTokenError)
1521 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1525 fmt.Printf("Ignored %v\n", e)
1529 // msg, err := c.ReadMessage(maxDur)
1531 // var kmsg KafkaPayload
1534 // c.Commit() //TODO: Ok?
1536 // //TODO: Check for exception
1538 // stats.in_msg_cnt++
1539 // log.Debug("Reader msg: ", &kmsg)
1540 // log.Debug("Reader - data_ch ", data_ch)
1542 // log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic, ", reason: ", err)
1551 func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
1553 var kafka_producer *kafka.Producer
1556 log.Info("Topic writer starting")
1558 // Wait for kafka producer to become available - and be prepared to exit the writer
1559 for kafka_producer == nil {
1561 case writer_ctl := <-control_ch:
1562 if writer_ctl.command == "EXIT" {
1566 kafka_producer = start_producer()
1567 if kafka_producer == nil {
1568 log.Debug("Could not start kafka producer - retrying")
1569 time.Sleep(1 * time.Second)
1571 log.Debug("Kafka producer started")
1572 //defer kafka_producer.Close()
1577 var event_chan = make(chan int)
1581 case evt := <-kafka_producer.Events():
1582 //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend?
1584 case *kafka.Message:
1585 m := evt.(*kafka.Message)
1587 if m.TopicPartition.Error != nil {
1588 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
1590 log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
1593 log.Debug("Dumping topic writer event, error: ", evt)
1594 case kafka.OAuthBearerTokenRefresh:
1595 log.Debug("New producer token needed: ", evt)
1596 token, err := fetch_token()
1598 log.Warning("Cannot cannot fetch token: ", err)
1599 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
1601 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
1602 if setTokenError != nil {
1603 log.Warning("Cannot cannot set token: ", setTokenError)
1604 kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
1608 log.Debug("Dumping topic writer event, unknown: ", evt)
1611 case msg := <-event_chan:
1615 case <-time.After(1 * time.Second):
1625 case writer_ctl := <-control_ch:
1626 if writer_ctl.command == "EXIT" {
1627 // ignore - wait for channel signal
1630 case kmsg := <-data_ch:
1633 // TODO: Close producer?
1634 log.Info("Topic writer stopped by channel signal - start_topic_writer")
1635 defer kafka_producer.Close()
1642 for retry := 1; retry <= retries && msg_ok == false; retry++ {
1643 err = kafka_producer.Produce(&kafka.Message{
1644 TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
1645 Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
1648 incr_out_msg_cnt(kmsg.jobid)
1650 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
1652 log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
1653 time.Sleep(time.Duration(retry) * time.Second)
1657 //TODO: Retry sending msg?
1658 log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
1660 case <-time.After(1000 * time.Millisecond):
1669 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
1670 var cm kafka.ConfigMap
1671 if creds_grant_type == "" {
1672 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1673 cm = kafka.ConfigMap{
1674 "bootstrap.servers": bootstrapserver,
1677 "auto.offset.reset": "latest",
1678 "enable.auto.commit": false,
1679 //"auto.commit.interval.ms": 5000,
1682 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1683 cm = kafka.ConfigMap{
1684 "bootstrap.servers": bootstrapserver,
1687 "auto.offset.reset": "latest",
1688 "enable.auto.commit": false,
1689 "sasl.mechanism": "OAUTHBEARER",
1690 "security.protocol": "SASL_PLAINTEXT",
1693 c, err := kafka.NewConsumer(&cm)
1695 //TODO: How to handle autocommit or commit message by message
1696 //TODO: Make arg to kafka configurable
1699 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
1704 log.Info("Created kafka consumer for type: ", type_id, " OK")
1708 // Start kafka producer
1709 func start_producer() *kafka.Producer {
1710 log.Info("Creating kafka producer")
1712 var cm kafka.ConfigMap
1713 if creds_grant_type == "" {
1714 log.Info("Creating kafka SASL plain text producer")
1715 cm = kafka.ConfigMap{
1716 "bootstrap.servers": bootstrapserver,
1719 log.Info("Creating kafka SASL plain text producer")
1720 cm = kafka.ConfigMap{
1721 "bootstrap.servers": bootstrapserver,
1722 "sasl.mechanism": "OAUTHBEARER",
1723 "security.protocol": "SASL_PLAINTEXT",
1727 p, err := kafka.NewProducer(&cm)
1729 log.Error("Cannot create kafka producer,", err)
1735 func start_adminclient() *kafka.AdminClient {
1736 log.Info("Creating kafka admin client")
1737 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
1739 log.Error("Cannot create kafka admin client,", err)
1745 func create_minio_client(id string) (*minio.Client, *error) {
1746 log.Debug("Get minio client")
1747 minio_client, err := minio.New(filestore_server, &minio.Options{
1749 Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
1752 log.Error("Cannot create minio client, ", err)
1755 return minio_client, nil
1758 func incr_out_msg_cnt(jobid string) {
1759 j, ok := InfoJobs[jobid]
1761 j.statistics.out_msg_cnt++
1765 func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
1767 log.Info("Type job", type_id, " started")
1769 filters := make(map[string]Filter)
1770 topic_list := make(map[string]string)
1771 var mc *minio.Client
1772 const mc_id = "mc_" + "start_job_xml_file_data"
1776 case job_ctl := <-control_ch:
1777 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1778 switch job_ctl.command {
1780 //ignore cmd - handled by channel signal
1782 filters[job_ctl.filter.JobId] = job_ctl.filter
1783 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1785 tmp_topic_list := make(map[string]string)
1786 for k, v := range topic_list {
1787 tmp_topic_list[k] = v
1789 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1790 topic_list = tmp_topic_list
1791 case "REMOVE-FILTER":
1792 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1794 tmp_topic_list := make(map[string]string)
1795 for k, v := range topic_list {
1796 tmp_topic_list[k] = v
1798 delete(tmp_topic_list, job_ctl.filter.JobId)
1799 topic_list = tmp_topic_list
1802 case msg := <-data_in_ch:
1804 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
1809 if fsbucket != "" && fvolume == "" {
1812 mc, err = create_minio_client(mc_id)
1814 log.Debug("Cannot create minio client for type job: ", type_id)
1818 //TODO: Sort processed file conversions in order (FIFO)
1819 jobLimiterChan <- struct{}{}
1820 go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
1822 case <-time.After(1 * time.Second):
1831 func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
1837 if fvolume == "" && fsbucket == "" {
1838 log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
1840 } else if (fvolume != "") && (fsbucket != "") {
1841 log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
1846 var evt_data XmlFileEventHeader
1848 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
1850 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
1853 log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
1855 var reader io.Reader
1858 INPUTBUCKET := "ropfiles"
1862 filename = fvolume + "/" + evt_data.Name
1863 fi, err := os.Open(filename)
1866 log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
1871 //} else if evt_data.ObjectStoreBucket != "" {
1873 filename = evt_data.Name
1875 tctx := context.Background()
1876 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
1878 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
1882 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err)
1888 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message")
1894 log.Error("Cannot get: ", filename, " - null reader")
1897 var file_bytes []byte
1898 if strings.HasSuffix(filename, "gz") {
1900 var buf3 bytes.Buffer
1901 errb := gunzipReaderToWriter(&buf3, reader)
1903 log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
1906 file_bytes = buf3.Bytes()
1907 log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
1910 var buf3 bytes.Buffer
1911 _, err2 := io.Copy(&buf3, reader)
1913 log.Error("File ", filename, " - cannot be read, discarding message, ", err)
1916 file_bytes = buf3.Bytes()
1919 b, err := xml_to_json_conv(&file_bytes, &evt_data)
1921 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
1924 log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
1926 new_fn := evt_data.Name + os.Getenv("KP") + ".json"
1927 if outputCompression == "gz" {
1928 new_fn = new_fn + ".gz"
1930 var buf bytes.Buffer
1931 err = gzipWrite(&buf, &b)
1933 log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
1937 log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
1944 err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
1946 log.Error("Cannot write file ", new_fn, " - discarding message,", err)
1949 log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
1950 } else if fsbucket != "" {
1952 objectName := new_fn
1955 contentType := "application/json"
1956 if strings.HasSuffix(objectName, ".gz") {
1957 contentType = "application/gzip"
1960 // Upload the xml file with PutObject
1961 r := bytes.NewReader(b)
1962 tctx := context.Background()
1963 if check_minio_bucket(mc, mc_id, fsbucket) == false {
1964 err := create_minio_bucket(mc, mc_id, fsbucket)
1966 log.Error("Cannot create bucket: ", fsbucket, ", ", err)
1971 for i := 1; i < 64 && ok == false; i = i * 2 {
1972 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
1976 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
1978 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
1980 time.Sleep(time.Duration(i) * time.Second)
1982 log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
1983 log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
1988 log.Error("Cannot upload : ", objectName, ", ", err)
1991 log.Error("Cannot upload: ", objectName, ", no client")
1997 var fde FileDownloadedEvt
1998 fde.Filename = new_fn
1999 j, err := jsoniter.Marshal(fde)
2002 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
2007 var fde FileDownloadedEvt
2008 fde.Filename = new_fn
2009 j, err := jsoniter.Marshal(fde)
2012 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
2017 msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
2018 log.Debug("Marshal file-collect event ", time.Since(start).String())
2020 for k, v := range topic_list {
2021 var kmsg *KafkaPayload = new(KafkaPayload)
2025 data_out_channel <- kmsg
2029 func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
2030 var f MeasCollecFile
2032 err := xml.Unmarshal(*f_byteValue, &f)
2034 return nil, errors.New("Cannot unmarshal xml-file")
2036 log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
2039 var pmfile PMJsonFile
2041 //TODO: Fill in more values
2042 pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
2043 pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
2044 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
2045 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
2046 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
2048 for _, it := range f.MeasData.MeasInfo {
2049 var mili MeasInfoList
2050 mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
2051 for _, jt := range it.MeasType {
2052 mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
2054 for _, jt := range it.MeasValue {
2056 mv.MeasObjInstID = jt.MeasObjLdn
2057 mv.SuspectFlag = jt.Suspect
2058 if jt.Suspect == "" {
2059 mv.SuspectFlag = "false"
2061 for _, kt := range jt.R {
2062 ni, _ := strconv.Atoi(kt.P)
2064 mr := MeasResults{ni, nv}
2065 mv.MeasResultsList = append(mv.MeasResultsList, mr)
2067 mili.MeasValuesList = append(mili.MeasValuesList, mv)
2070 pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
2073 pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
2075 //TODO: Fill more values
2076 pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
2077 pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
2078 pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
2079 pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
2080 pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
2081 pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
2082 pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority
2083 pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
2084 pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
2085 pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version
2086 pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
2087 pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
2089 log.Debug("Convert xml to json : ", time.Since(start).String())
2092 json, err := jsoniter.Marshal(pmfile)
2093 log.Debug("Marshal json : ", time.Since(start).String())
2096 return nil, errors.New("Cannot marshal converted json")
2101 func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
2103 log.Info("Type job", type_id, " started")
2105 filters := make(map[string]Filter)
2106 filterParams_list := make(map[string]FilterMaps)
2107 // ch_list := make(map[string]chan *KafkaPayload)
2108 topic_list := make(map[string]string)
2109 var mc *minio.Client
2110 const mc_id = "mc_" + "start_job_json_file_data"
2114 case job_ctl := <-control_ch:
2115 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2116 switch job_ctl.command {
2118 //ignore cmd - handled by channel signal
2121 filters[job_ctl.filter.JobId] = job_ctl.filter
2122 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2124 tmp_filterParams_list := make(map[string]FilterMaps)
2125 for k, v := range filterParams_list {
2126 tmp_filterParams_list[k] = v
2128 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2129 filterParams_list = tmp_filterParams_list
2131 tmp_topic_list := make(map[string]string)
2132 for k, v := range topic_list {
2133 tmp_topic_list[k] = v
2135 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
2136 topic_list = tmp_topic_list
2137 case "REMOVE-FILTER":
2139 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2141 tmp_filterParams_list := make(map[string]FilterMaps)
2142 for k, v := range filterParams_list {
2143 tmp_filterParams_list[k] = v
2145 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2146 filterParams_list = tmp_filterParams_list
2148 tmp_topic_list := make(map[string]string)
2149 for k, v := range topic_list {
2150 tmp_topic_list[k] = v
2152 delete(tmp_topic_list, job_ctl.filter.JobId)
2153 topic_list = tmp_topic_list
2156 case msg := <-data_in_ch:
2158 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
2166 mc, err = create_minio_client(mc_id)
2168 log.Debug("Cannot create minio client for type job: ", type_id)
2172 //TODO: Sort processed file conversions in order (FIFO)
2173 jobLimiterChan <- struct{}{}
2174 go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
2176 case <-time.After(1 * time.Second):
2184 func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2193 var evt_data FileDownloadedEvt
2194 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2196 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2199 log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2201 var reader io.Reader
2204 //INPUTBUCKET := "json-file-ready"
2205 INPUTBUCKET := "pm-files-json"
2207 if objectstore == false {
2208 filename = files_volume + "/" + evt_data.Filename
2209 fi, err := os.Open(filename)
2212 log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2218 filename = "/" + evt_data.Filename
2220 if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2221 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2224 tctx := context.Background()
2225 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2227 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2233 log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2239 if strings.HasSuffix(filename, "gz") {
2241 var buf2 bytes.Buffer
2242 errb := gunzipReaderToWriter(&buf2, reader)
2244 log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2249 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2253 d, err := io.ReadAll(reader)
2255 log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2260 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2263 // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
2264 // var pmfile PMJsonFile
2265 // start := time.Now()
2266 // err = jsoniter.Unmarshal(*data, &pmfile)
2267 // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2270 // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2273 for k, v := range filterList {
2275 var pmfile PMJsonFile
2277 err = jsoniter.Unmarshal(*data, &pmfile)
2278 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2281 log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2285 var kmsg *KafkaPayload = new(KafkaPayload)
2286 kmsg.msg = new(kafka.Message)
2287 kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
2288 log.Debug("topic:", topic_list[k])
2289 log.Debug("sourceNameMap:", v.sourceNameMap)
2290 log.Debug("measObjClassMap:", v.measObjClassMap)
2291 log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
2292 log.Debug("measTypesMap:", v.measTypesMap)
2293 //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2294 b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2296 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2302 // if outputCompression == "json.gz" {
2303 // start := time.Now()
2304 // var buf bytes.Buffer
2305 // err := gzipWrite(&buf, &kmsg.msg.Value)
2307 // log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err)
2311 // kmsg.msg.Value = buf.Bytes()
2312 // log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String())
2314 kmsg.topic = topic_list[k]
2317 data_out_channel <- kmsg
2322 func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2324 if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
2328 j, err := jsoniter.Marshal(&data)
2330 log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2333 log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2337 log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2341 func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
2344 if len(sourceNameMap) != 0 {
2345 if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2352 var temp_mil []MeasInfoList
2353 for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2356 var cnt_flags []bool
2357 if len(measTypesMap) > 0 {
2359 var temp_mtl []string
2360 for _, v := range zz.MeasTypes.SMeasTypesList {
2361 if measTypesMap[v] {
2362 cnt_flags = append(cnt_flags, true)
2364 temp_mtl = append(temp_mtl, v)
2366 cnt_flags = append(cnt_flags, false)
2371 zz.MeasTypes.SMeasTypesList = temp_mtl
2378 var temp_mvl []MeasValues
2379 for _, yy := range zz.MeasValuesList {
2384 dna := strings.Split(yy.MeasObjInstID, ",")
2385 instName := dna[len(dna)-1]
2386 cls := strings.Split(dna[len(dna)-1], "=")[0]
2388 if len(measObjClassMap) > 0 {
2389 if measObjClassMap[cls] {
2396 if len(measObjInstIdsMap) > 0 {
2397 if measObjInstIdsMap[instName] {
2405 var temp_mrl []MeasResults
2407 for _, v := range yy.MeasResultsList {
2408 if cnt_flags[v.P-1] {
2411 temp_mrl = append(temp_mrl, v)
2414 yy.MeasResultsList = temp_mrl
2419 if keep_class && keep_cntr && keep_inst {
2421 temp_mvl = append(temp_mvl, yy)
2425 zz.MeasValuesList = temp_mvl
2426 temp_mil = append(temp_mil, zz)
2433 if len(temp_mil) == 0 {
2434 log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2437 data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2440 log.Debug("Filter: ", time.Since(start).String())
2444 // func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2446 // filter_req := true
2447 // start := time.Now()
2448 // if len(sourceNameMap) != 0 {
2449 // if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2450 // filter_req = false
2455 // modified := false
2456 // var temp_mil []MeasInfoList
2457 // for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2459 // check_cntr := false
2460 // var cnt_flags []bool
2461 // if len(measTypesMap) > 0 {
2463 // var temp_mtl []string
2464 // for _, v := range zz.MeasTypes.SMeasTypesList {
2465 // if measTypesMap[v] {
2466 // cnt_flags = append(cnt_flags, true)
2468 // temp_mtl = append(temp_mtl, v)
2470 // cnt_flags = append(cnt_flags, false)
2474 // check_cntr = true
2475 // zz.MeasTypes.SMeasTypesList = temp_mtl
2482 // var temp_mvl []MeasValues
2483 // for _, yy := range zz.MeasValuesList {
2484 // keep_class := false
2485 // keep_inst := false
2486 // keep_cntr := false
2488 // dna := strings.Split(yy.MeasObjInstID, ",")
2489 // instName := dna[len(dna)-1]
2490 // cls := strings.Split(dna[len(dna)-1], "=")[0]
2492 // if len(measObjClassMap) > 0 {
2493 // if measObjClassMap[cls] {
2494 // keep_class = true
2497 // keep_class = true
2500 // if len(measObjInstIdsMap) > 0 {
2501 // if measObjInstIdsMap[instName] {
2509 // var temp_mrl []MeasResults
2511 // for _, v := range yy.MeasResultsList {
2512 // if cnt_flags[v.P-1] {
2515 // temp_mrl = append(temp_mrl, v)
2518 // yy.MeasResultsList = temp_mrl
2523 // if keep_class && keep_cntr && keep_inst {
2525 // temp_mvl = append(temp_mvl, yy)
2529 // zz.MeasValuesList = temp_mvl
2530 // temp_mil = append(temp_mil, zz)
2535 // //Only if modified
2537 // if len(temp_mil) == 0 {
2538 // log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2541 // data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2544 // log.Debug("Filter: ", time.Since(start).String())
2546 // start = time.Now()
2547 // j, err := jsoniter.Marshal(&data)
2549 // log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2552 // log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2556 // log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2560 func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
2562 log.Info("Type job", type_id, " started")
2563 log.Debug("influx job ch ", data_in_ch)
2564 filters := make(map[string]Filter)
2565 filterParams_list := make(map[string]FilterMaps)
2566 influx_job_params := make(map[string]InfluxJobParameters)
2567 var mc *minio.Client
2568 const mc_id = "mc_" + "start_job_json_file_data_influx"
2572 case job_ctl := <-control_ch:
2573 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2574 switch job_ctl.command {
2576 //ignore cmd - handled by channel signal
2579 filters[job_ctl.filter.JobId] = job_ctl.filter
2580 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2581 log.Debug(job_ctl.filter)
2582 tmp_filterParams_list := make(map[string]FilterMaps)
2583 for k, v := range filterParams_list {
2584 tmp_filterParams_list[k] = v
2586 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2587 filterParams_list = tmp_filterParams_list
2589 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2590 for k, v := range influx_job_params {
2591 tmp_influx_job_params[k] = v
2593 tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
2594 influx_job_params = tmp_influx_job_params
2596 case "REMOVE-FILTER":
2598 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2600 tmp_filterParams_list := make(map[string]FilterMaps)
2601 for k, v := range filterParams_list {
2602 tmp_filterParams_list[k] = v
2604 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2605 filterParams_list = tmp_filterParams_list
2607 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2608 for k, v := range influx_job_params {
2609 tmp_influx_job_params[k] = v
2611 delete(tmp_influx_job_params, job_ctl.filter.JobId)
2612 influx_job_params = tmp_influx_job_params
2615 case msg := <-data_in_ch:
2616 log.Debug("Data reveived - influx")
2618 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
2626 mc, err = create_minio_client(mc_id)
2628 log.Debug("Cannot create minio client for type job: ", type_id)
2632 //TODO: Sort processed file conversions in order (FIFO)
2633 jobLimiterChan <- struct{}{}
2634 go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
2636 case <-time.After(1 * time.Second):
2644 func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2646 log.Debug("run_json_file_data_job_influx")
2654 var evt_data FileDownloadedEvt
2655 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2657 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2660 log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2662 var reader io.Reader
2665 //INPUTBUCKET := "json-file-ready"
2666 INPUTBUCKET := "pm-files-json"
2668 if objectstore == false {
2669 filename = files_volume + "/" + evt_data.Filename
2670 fi, err := os.Open(filename)
2673 log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2679 filename = "/" + evt_data.Filename
2681 if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2682 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2685 tctx := context.Background()
2686 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2688 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2694 log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2700 if strings.HasSuffix(filename, "gz") {
2702 var buf2 bytes.Buffer
2703 errb := gunzipReaderToWriter(&buf2, reader)
2705 log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2710 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2714 d, err := io.ReadAll(reader)
2716 log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2721 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2723 // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
2724 // var pmfile PMJsonFile
2725 // start := time.Now()
2726 // err = jsoniter.Unmarshal(*data, &pmfile)
2727 // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2730 // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2733 for k, v := range filterList {
2735 var pmfile PMJsonFile
2737 err = jsoniter.Unmarshal(*data, &pmfile)
2738 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2741 log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2745 if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2746 b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2748 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2753 fluxParms := influxList[k]
2754 log.Debug("Influxdb params: ", fluxParms)
2755 client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
2756 writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
2758 // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2759 // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond)
2760 // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond)
2761 // timeT := time.Unix(tUnix, tUnixNanoRemainder)
2762 // fmt.Println(timeT)
2763 // fmt.Println("======================")
2764 for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2765 ctr_names := make(map[string]string)
2766 for cni, cn := range zz.MeasTypes.SMeasTypesList {
2767 ctr_names[string(cni+1)] = cn
2769 for _, xx := range zz.MeasValuesList {
2770 log.Debug("Measurement: ", xx.MeasObjInstID)
2771 log.Debug("Suspect flag: ", xx.SuspectFlag)
2772 p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
2773 p.AddField("suspectflag", xx.SuspectFlag)
2774 p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
2775 p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
2776 for _, yy := range xx.MeasResultsList {
2780 log.Debug("Counter: ", pn, " Value: ", pv)
2781 pv_i, err := strconv.Atoi(pv)
2783 p.AddField(pn, pv_i)
2789 log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2790 log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2791 p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2792 err := writeAPI.WritePoint(context.Background(), p)
2794 log.Error("Db write error: ", err)