1 // ============LICENSE_START===============================================
2 // Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 // ========================================================================
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 // ============LICENSE_END=================================================
43 "github.com/google/uuid"
44 "golang.org/x/oauth2/clientcredentials"
46 log "github.com/sirupsen/logrus"
48 "github.com/gorilla/mux"
52 "github.com/confluentinc/confluent-kafka-go/kafka"
53 influxdb2 "github.com/influxdata/influxdb-client-go/v2"
54 jsoniter "github.com/json-iterator/go"
55 "github.com/minio/minio-go/v7"
56 "github.com/minio/minio-go/v7/pkg/credentials"
62 const https_port = 443
63 const config_file = "application_configuration.json"
64 const server_crt = "server.crt"
65 const server_key = "server.key"
67 const producer_name = "kafka-producer"
69 const registration_delay_short = 2
70 const registration_delay_long = 120
80 const reader_queue_length = 100 //Per type job
81 const writer_queue_length = 100 //Per info job
82 const parallelism_limiter = 100 //For all jobs
84 // This are optional - set if using SASL protocol is used towards kafka
85 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
86 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
87 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
88 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
94 type FilterParameters struct {
95 MeasuredEntityDns []string `json:"measuredEntityDns"`
96 MeasTypes []string `json:"measTypes"`
97 MeasObjClass []string `json:"measObjClass"`
98 MeasObjInstIds []string `json:"measObjInstIds"`
101 type InfoJobDataType struct {
103 KafkaOutputTopic string `json:"kafkaOutputTopic"`
105 DbUrl string `json:"db-url"`
106 DbOrg string `json:"db-org"`
107 DbBucket string `json:"db-bucket"`
108 DbToken string `json:"db-token"`
110 FilterParams FilterParameters `json:"filter"`
111 } `json:"info_job_data"`
112 InfoJobIdentity string `json:"info_job_identity"`
113 InfoTypeIdentity string `json:"info_type_identity"`
114 LastUpdated string `json:"last_updated"`
115 Owner string `json:"owner"`
116 TargetURI string `json:"target_uri"`
119 // Type for an infojob
120 type InfoJobRecord struct {
121 job_info InfoJobDataType
124 statistics *InfoJobStats
127 // Type for an infojob
128 type TypeJobRecord struct {
131 data_in_channel chan *KafkaPayload
132 reader_control chan ReaderControl
133 job_control chan JobControl
137 statistics *TypeJobStats
140 // Type for controlling the topic reader
141 type ReaderControl struct {
145 // Type for controlling the topic writer
146 type WriterControl struct {
150 // Type for controlling the job
151 type JobControl struct {
156 type KafkaPayload struct {
162 type FilterMaps struct {
163 sourceNameMap map[string]bool
164 measObjClassMap map[string]bool
165 measObjInstIdsMap map[string]bool
166 measTypesMap map[string]bool
169 type InfluxJobParameters struct {
181 influxParameters InfluxJobParameters
184 // Type for info job statistics
185 type InfoJobStats struct {
190 // Type for type job statistics
191 type TypeJobStats struct {
196 // == API Datatypes ==//
197 // Type for supported data types
198 type DataType struct {
199 ID string `json:"id"`
200 KafkaInputTopic string `json:"kafkaInputTopic"`
201 InputJobType string `json:inputJobType`
202 InputJobDefinition struct {
203 KafkaOutputTopic string `json:kafkaOutputTopic`
204 } `json:inputJobDefinition`
211 type DataTypes struct {
212 ProdDataTypes []DataType `json:"types"`
215 type Minio_buckets struct {
216 Buckets map[string]bool
219 //== External data types ==//
221 // // Data type for event xml file download
222 type XmlFileEventHeader struct {
223 ProductName string `json:"productName"`
224 VendorName string `json:"vendorName"`
225 Location string `json:"location"`
226 Compression string `json:"compression"`
227 SourceName string `json:"sourceName"`
228 FileFormatType string `json:"fileFormatType"`
229 FileFormatVersion string `json:"fileFormatVersion"`
230 StartEpochMicrosec int64 `json:"startEpochMicrosec"`
231 LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
232 Name string `json:"name"`
233 ChangeIdentifier string `json:"changeIdentifier"`
234 InternalLocation string `json:"internalLocation"`
235 TimeZoneOffset string `json:"timeZoneOffset"`
236 //ObjectStoreBucket string `json:"objectStoreBucket"`
239 // Data types for input xml file
240 type MeasCollecFile struct {
241 XMLName xml.Name `xml:"measCollecFile"`
242 Text string `xml:",chardata"`
243 Xmlns string `xml:"xmlns,attr"`
244 Xsi string `xml:"xsi,attr"`
245 SchemaLocation string `xml:"schemaLocation,attr"`
247 Text string `xml:",chardata"`
248 FileFormatVersion string `xml:"fileFormatVersion,attr"`
249 VendorName string `xml:"vendorName,attr"`
250 DnPrefix string `xml:"dnPrefix,attr"`
252 Text string `xml:",chardata"`
253 LocalDn string `xml:"localDn,attr"`
254 ElementType string `xml:"elementType,attr"`
257 Text string `xml:",chardata"`
258 BeginTime string `xml:"beginTime,attr"`
262 Text string `xml:",chardata"`
263 ManagedElement struct {
264 Text string `xml:",chardata"`
265 LocalDn string `xml:"localDn,attr"`
266 SwVersion string `xml:"swVersion,attr"`
267 } `xml:"managedElement"`
269 Text string `xml:",chardata"`
270 MeasInfoId string `xml:"measInfoId,attr"`
272 Text string `xml:",chardata"`
273 JobId string `xml:"jobId,attr"`
276 Text string `xml:",chardata"`
277 Duration string `xml:"duration,attr"`
278 EndTime string `xml:"endTime,attr"`
281 Text string `xml:",chardata"`
282 Duration string `xml:"duration,attr"`
285 Text string `xml:",chardata"`
286 P string `xml:"p,attr"`
289 Text string `xml:",chardata"`
290 MeasObjLdn string `xml:"measObjLdn,attr"`
292 Text string `xml:",chardata"`
293 P string `xml:"p,attr"`
295 Suspect string `xml:"suspect"`
300 Text string `xml:",chardata"`
302 Text string `xml:",chardata"`
303 EndTime string `xml:"endTime,attr"`
308 // Data type for json file
309 // Splitted in sevreal part to allow add/remove in lists
310 type MeasResults struct {
312 SValue string `json:"sValue"`
315 type MeasValues struct {
316 MeasObjInstID string `json:"measObjInstId"`
317 SuspectFlag string `json:"suspectFlag"`
318 MeasResultsList []MeasResults `json:"measResults"`
321 type SMeasTypes struct {
322 SMeasType string `json:"sMeasTypesList"`
325 type MeasInfoList struct {
327 SMeasInfoID string `json:"sMeasInfoId"`
328 } `json:"measInfoId"`
330 SMeasTypesList []string `json:"sMeasTypesList"`
332 MeasValuesList []MeasValues `json:"measValuesList"`
335 type PMJsonFile struct {
337 CommonEventHeader struct {
338 Domain string `json:"domain"`
339 EventID string `json:"eventId"`
340 Sequence int `json:"sequence"`
341 EventName string `json:"eventName"`
342 SourceName string `json:"sourceName"`
343 ReportingEntityName string `json:"reportingEntityName"`
344 Priority string `json:"priority"`
345 StartEpochMicrosec int64 `json:"startEpochMicrosec"`
346 LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
347 Version string `json:"version"`
348 VesEventListenerVersion string `json:"vesEventListenerVersion"`
349 TimeZoneOffset string `json:"timeZoneOffset"`
350 } `json:"commonEventHeader"`
351 Perf3GppFields struct {
352 Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
353 MeasDataCollection struct {
354 GranularityPeriod int `json:"granularityPeriod"`
355 MeasuredEntityUserName string `json:"measuredEntityUserName"`
356 MeasuredEntityDn string `json:"measuredEntityDn"`
357 MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"`
358 SMeasInfoList []MeasInfoList `json:"measInfoList"`
359 } `json:"measDataCollection"`
360 } `json:"perf3gppFields"`
364 // Data type for converted json file message
365 type FileDownloadedEvt struct {
366 Filename string `json:"filename"`
373 // Lock for all internal data
374 var datalock sync.Mutex
376 var producer_instance_name string = producer_name
378 // Keep all info type jobs, key == type id
379 var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
381 // Keep all info jobs, key == job id
382 var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
384 var InfoTypes DataTypes
386 // Limiter - valid for all jobs
387 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
389 // TODO: Config param?
390 var bucket_location = "swe"
392 var httpclient = &http.Client{}
394 // == Env variables ==//
395 var bootstrapserver = os.Getenv("KAFKA_SERVER")
396 var files_volume = os.Getenv("FILES_VOLUME")
397 var ics_server = os.Getenv("ICS")
398 var self = os.Getenv("SELF")
399 var filestore_user = os.Getenv("FILESTORE_USER")
400 var filestore_pwd = os.Getenv("FILESTORE_PWD")
401 var filestore_server = os.Getenv("FILESTORE_SERVER")
403 var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
404 var writer_control = make(chan WriterControl, 1)
406 var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
411 //log.SetLevel(log.InfoLevel)
412 log.SetLevel(log.TraceLevel)
414 log.Info("Server starting...")
417 log.Panic("Env SELF not configured")
419 if bootstrapserver == "" {
420 log.Panic("Env KAFKA_SERVER not set")
422 if ics_server == "" {
423 log.Panic("Env ICS not set")
425 if os.Getenv("KP") != "" {
426 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
429 rtr := mux.NewRouter()
430 rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
431 rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
432 rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
433 rtr.HandleFunc("/statistics", statistics)
434 rtr.HandleFunc("/logging/{level}", logging_level)
435 rtr.HandleFunc("/logging", logging_level)
436 rtr.HandleFunc("/", alive)
438 //For perf/mem profiling
439 rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
441 http.Handle("/", rtr)
443 http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
445 cer, err := tls.LoadX509KeyPair(server_crt, server_key)
447 log.Error("Cannot load key and cert - ", err)
450 config := &tls.Config{Certificates: []tls.Certificate{cer}}
451 https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
455 log.Info("Starting http service...")
456 err := http_server.ListenAndServe()
457 if err == http.ErrServerClosed { // graceful shutdown
458 log.Info("http server shutdown...")
459 } else if err != nil {
460 log.Error("http server error: ", err)
466 log.Info("Starting https service...")
467 err := https_server.ListenAndServe()
468 if err == http.ErrServerClosed { // graceful shutdown
469 log.Info("https server shutdown...")
470 } else if err != nil {
471 log.Error("https server error: ", err)
474 check_tcp(strconv.Itoa(http_port))
475 check_tcp(strconv.Itoa(https_port))
477 go start_topic_writer(writer_control, data_out_channel)
479 //Setup proc for periodic type registration
480 var event_chan = make(chan int) //Channel for stopping the proc
481 go periodic_registration(event_chan)
483 //Wait for term/int signal do try to shut down gracefully
484 sigs := make(chan os.Signal, 1)
485 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
488 fmt.Printf("Received signal %s - application will terminate\n", sig)
489 event_chan <- 0 // Stop periodic registration
491 defer datalock.Unlock()
492 AppState = Terminating
493 http_server.Shutdown(context.Background())
494 https_server.Shutdown(context.Background())
496 for key, _ := range TypeJobs {
497 log.Info("Stopping type job:", key)
498 for _, dp := range InfoTypes.ProdDataTypes {
508 //Wait until all go routines has exited
511 fmt.Println("main routine exit")
512 fmt.Println("server stopped")
515 func check_tcp(port string) {
516 log.Info("Checking tcp port: ", port)
518 address := net.JoinHostPort("localhost", port)
520 conn, err := net.DialTimeout("tcp", address, 3*time.Second)
522 log.Info("Checking tcp port: ", port, " failed, retrying...")
525 log.Info("Checking tcp port: ", port, " - OK")
529 log.Info("Checking tcp port: ", port, " failed, retrying...")
535 //== Core functions ==//
537 // Run periodic registration of producers
538 func periodic_registration(evtch chan int) {
543 if msg == 0 { // Stop thread
546 case <-time.After(time.Duration(delay) * time.Second):
547 ok := register_producer()
549 delay = registration_delay_long
551 if delay < registration_delay_long {
552 delay += registration_delay_short
554 delay = registration_delay_short
561 func register_producer() bool {
563 log.Info("Registering producer: ", producer_instance_name)
565 file, err := os.ReadFile(config_file)
567 log.Error("Cannot read config file: ", config_file)
568 log.Error("Registering producer: ", producer_instance_name, " - failed")
572 err = jsoniter.Unmarshal([]byte(file), &data)
574 log.Error("Cannot parse config file: ", config_file)
575 log.Error("Registering producer: ", producer_instance_name, " - failed")
578 var new_type_names []string
580 for i := 0; i < len(data.ProdDataTypes); i++ {
581 t1 := make(map[string]interface{})
582 t2 := make(map[string]interface{})
584 t2["schema"] = "http://json-schema.org/draft-07/schema#"
585 t2["title"] = data.ProdDataTypes[i].ID
586 t2["description"] = data.ProdDataTypes[i].ID
587 t2["type"] = "object"
589 t1["info_job_data_schema"] = t2
591 json, err := json.Marshal(t1)
593 log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
594 log.Error("Registering producer: ", producer_instance_name, " - failed")
597 ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
599 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
600 log.Error("Registering producer: ", producer_instance_name, " - failed")
603 new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
608 log.Debug("Registering types: ", new_type_names)
609 m := make(map[string]interface{})
610 m["supported_info_types"] = new_type_names
611 m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
612 m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
614 json, err := json.Marshal(m)
616 log.Error("Cannot create json for producer: ", producer_instance_name)
617 log.Error("Registering producer: ", producer_instance_name, " - failed")
620 ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
622 log.Error("Cannot register producer: ", producer_instance_name)
623 log.Error("Registering producer: ", producer_instance_name, " - failed")
627 defer datalock.Unlock()
629 var current_type_names []string
630 for _, v := range InfoTypes.ProdDataTypes {
631 current_type_names = append(current_type_names, v.ID)
632 if contains_str(new_type_names, v.ID) {
634 log.Debug("Type ", v.ID, " exists")
638 log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
643 for _, v := range data.ProdDataTypes {
644 if contains_str(current_type_names, v.ID) {
646 log.Debug("Type ", v.ID, " exists")
650 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
656 log.Debug("Datatypes: ", InfoTypes)
658 log.Info("Registering producer: ", producer_instance_name, " - OK")
662 func remove_type_job(dp DataType) {
663 log.Info("Removing type job: ", dp.ID)
664 j, ok := TypeJobs[dp.ID]
666 j.reader_control <- ReaderControl{"EXIT"}
669 if dp.ext_job_created == true {
670 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
671 ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
673 log.Error("Cannot delete job: ", dp.ext_job_id)
675 dp.ext_job_created = false
681 func start_type_job(dp DataType) {
682 log.Info("Starting type job: ", dp.ID)
683 job_record := TypeJobRecord{}
685 job_record.job_control = make(chan JobControl, 1)
686 job_record.reader_control = make(chan ReaderControl, 1)
687 job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
688 job_record.InfoType = dp.ID
689 job_record.InputTopic = dp.KafkaInputTopic
690 job_record.groupId = "kafka-procon-" + dp.ID
691 job_record.clientId = dp.ID + "-" + os.Getenv("KP")
692 var stats TypeJobStats
693 job_record.statistics = &stats
696 case "xml-file-data-to-filestore":
697 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
698 case "xml-file-data":
699 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
700 case "json-file-data-from-filestore":
701 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
702 case "json-file-data":
703 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
704 case "json-file-data-from-filestore-to-influx":
705 go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
710 go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
712 TypeJobs[dp.ID] = job_record
713 log.Debug("Type job input type: ", dp.InputJobType)
717 func create_ext_job(dp DataType) {
718 if dp.InputJobType != "" {
719 jb := make(map[string]interface{})
720 jb["info_type_id"] = dp.InputJobType
721 jb["job_owner"] = "console" //TODO:
722 jb["status_notification_uri"] = "http://callback:80/post"
723 jb1 := make(map[string]interface{})
724 jb["job_definition"] = jb1
725 jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
727 json, err := json.Marshal(jb)
728 dp.ext_job_created = false
731 log.Error("Cannot create json for type: ", dp.InputJobType)
735 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
738 ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
740 log.Error("Cannot register job: ", dp.InputJobType)
743 log.Debug("Registered job ok: ", dp.InputJobType)
744 dp.ext_job_created = true
749 func remove_info_job(jobid string) {
750 log.Info("Removing info job: ", jobid)
754 jc.command = "REMOVE-FILTER"
756 infoJob := InfoJobs[jobid]
757 typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
758 typeJob.job_control <- jc
759 delete(InfoJobs, jobid)
763 // == Helper functions ==//
765 // Function to check the status of a mutex lock
766 func MutexLocked(m *sync.Mutex) bool {
767 state := reflect.ValueOf(m).Elem().FieldByName("state")
768 return state.Int()&mutexLocked == mutexLocked
771 // Test if slice contains a string
772 func contains_str(s []string, e string) bool {
773 for _, a := range s {
781 // Send a http request with json (json may be nil)
782 func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
784 // set the HTTP method, url, and request body
785 var req *http.Request
788 req, err = http.NewRequest(method, url, http.NoBody)
790 req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
791 req.Header.Set("Content-Type", "application/json; charset=utf-8")
794 log.Error("Cannot create http request, method: ", method, " url: ", url)
799 token, err := fetch_token()
801 log.Error("Cannot fetch token for http request: ", err)
804 req.Header.Set("Authorization", "Bearer "+token.TokenValue)
807 log.Debug("HTTP request: ", req)
809 log.Debug("Sending http request")
810 resp, err2 := httpclient.Do(req)
812 log.Error("Http request error: ", err2)
813 log.Error("Cannot send http request method: ", method, " url: ", url)
815 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
816 log.Debug("Accepted http status: ", resp.StatusCode)
820 log.Debug("HTTP resp: ", resp)
826 func fetch_token() (*kafka.OAuthBearerToken, error) {
827 log.Debug("Get token inline")
828 conf := &clientcredentials.Config{
829 ClientID: creds_client_id,
830 ClientSecret: creds_client_secret,
831 TokenURL: creds_service_url,
833 token, err := conf.Token(context.Background())
835 log.Warning("Cannot fetch access token: ", err)
838 extensions := map[string]string{}
839 log.Debug("=====================================================")
840 log.Debug("token: ", token)
841 log.Debug("=====================================================")
842 log.Debug("TokenValue: ", token.AccessToken)
843 log.Debug("=====================================================")
844 log.Debug("Expiration: ", token.Expiry)
846 oauthBearerToken := kafka.OAuthBearerToken{
847 TokenValue: token.AccessToken,
849 Extensions: extensions,
852 return &oauthBearerToken, nil
855 // Function to print memory details
856 // https://pkg.go.dev/runtime#MemStats
857 func PrintMemUsage() {
858 if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
859 var m runtime.MemStats
860 runtime.ReadMemStats(&m)
861 fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
862 fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
863 fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
864 fmt.Printf("\tNumGC = %v\n", m.NumGC)
865 fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
866 fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
867 fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
868 fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
869 fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
870 fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
871 fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
875 func bToMb(b uint64) uint64 {
876 return b / 1024 / 1024
879 func generate_uuid_from_type(s string) string {
888 uuid, _ := uuid.FromBytes(b)
892 // Write gzipped data to a Writer
893 func gzipWrite(w io.Writer, data *[]byte) error {
894 gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
900 _, err2 := gw.Write(*data)
904 // Write gunzipped data from Reader to a Writer
905 func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
906 gr, err1 := gzip.NewReader(data)
912 data2, err2 := io.ReadAll(gr)
916 _, err3 := w.Write(data2)
923 func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
924 tctx := context.Background()
925 err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
927 // Check to see if we already own this bucket (which happens if you run this twice)
928 exists, errBucketExists := mc.BucketExists(tctx, bucket)
929 if errBucketExists == nil && exists {
930 log.Debug("Already own bucket:", bucket)
931 add_bucket(client_id, bucket)
934 log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
938 log.Debug("Successfully created bucket: ", bucket)
939 add_bucket(client_id, bucket)
943 func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
944 ok := bucket_exist(client_id, bucket)
948 tctx := context.Background()
949 exists, err := mc.BucketExists(tctx, bucket)
950 if err == nil && exists {
951 log.Debug("Already own bucket:", bucket)
954 log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
958 func add_bucket(minio_id string, bucket string) {
960 defer datalock.Unlock()
962 b, ok := minio_bucketlist[minio_id]
965 b.Buckets = make(map[string]bool)
967 b.Buckets[bucket] = true
968 minio_bucketlist[minio_id] = b
971 func bucket_exist(minio_id string, bucket string) bool {
973 defer datalock.Unlock()
975 b, ok := minio_bucketlist[minio_id]
979 _, ok = b.Buckets[bucket]
983 //== http api functions ==//
986 func create_job(w http.ResponseWriter, req *http.Request) {
987 log.Debug("Create job, http method: ", req.Method)
988 if req.Method != http.MethodPost {
989 log.Error("Create job, http method not allowed")
990 w.WriteHeader(http.StatusMethodNotAllowed)
993 ct := req.Header.Get("Content-Type")
994 if ct != "application/json" {
995 log.Error("Create job, bad content type")
996 http.Error(w, "Bad content type", http.StatusBadRequest)
1000 var t InfoJobDataType
1001 err := json.NewDecoder(req.Body).Decode(&t)
1003 log.Error("Create job, cannot parse json,", err)
1004 http.Error(w, "Cannot parse json", http.StatusBadRequest)
1007 log.Debug("Creating job, id: ", t.InfoJobIdentity)
1009 defer datalock.Unlock()
1011 job_id := t.InfoJobIdentity
1012 job_record, job_found := InfoJobs[job_id]
1013 type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
1016 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1017 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1020 } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
1021 log.Error("Job cannot change type")
1022 http.Error(w, "Job cannot change type", http.StatusBadRequest)
1024 } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
1025 log.Error("Job cannot change topic")
1026 http.Error(w, "Job cannot change topic", http.StatusBadRequest)
1028 } else if !found_type {
1029 //Should never happen, if the type is removed then job is stopped
1030 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1031 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1036 job_record = InfoJobRecord{}
1037 job_record.job_info = t
1038 output_topic := t.InfoJobData.KafkaOutputTopic
1039 job_record.output_topic = t.InfoJobData.KafkaOutputTopic
1040 log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
1042 var stats InfoJobStats
1043 job_record.statistics = &stats
1046 filter.JobId = job_id
1047 filter.OutputTopic = job_record.output_topic
1051 jc.command = "ADD-FILTER"
1053 if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1055 fm.sourceNameMap = make(map[string]bool)
1056 fm.measObjClassMap = make(map[string]bool)
1057 fm.measObjInstIdsMap = make(map[string]bool)
1058 fm.measTypesMap = make(map[string]bool)
1059 if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
1060 for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
1061 fm.sourceNameMap[v] = true
1064 if t.InfoJobData.FilterParams.MeasObjClass != nil {
1065 for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
1066 fm.measObjClassMap[v] = true
1069 if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
1070 for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
1071 fm.measObjInstIdsMap[v] = true
1074 if t.InfoJobData.FilterParams.MeasTypes != nil {
1075 for _, v := range t.InfoJobData.FilterParams.MeasTypes {
1076 fm.measTypesMap[v] = true
1081 if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1082 influxparam := InfluxJobParameters{}
1083 influxparam.DbUrl = t.InfoJobData.DbUrl
1084 influxparam.DbOrg = t.InfoJobData.DbOrg
1085 influxparam.DbBucket = t.InfoJobData.DbBucket
1086 influxparam.DbToken = t.InfoJobData.DbToken
1087 filter.influxParameters = influxparam
1091 InfoJobs[job_id] = job_record
1093 type_job_record.job_control <- jc
1102 func delete_job(w http.ResponseWriter, req *http.Request) {
1103 if req.Method != http.MethodDelete {
1104 w.WriteHeader(http.StatusMethodNotAllowed)
1108 defer datalock.Unlock()
1110 vars := mux.Vars(req)
1112 if id, ok := vars["job_id"]; ok {
1113 if _, ok := InfoJobs[id]; ok {
1115 w.WriteHeader(http.StatusNoContent)
1116 log.Info("Job ", id, " deleted")
1120 w.WriteHeader(http.StatusNotFound)
1124 func supervise_job(w http.ResponseWriter, req *http.Request) {
1125 if req.Method != http.MethodGet {
1126 w.WriteHeader(http.StatusMethodNotAllowed)
1130 defer datalock.Unlock()
1132 vars := mux.Vars(req)
1134 log.Debug("Supervising, job: ", vars["job_id"])
1135 if id, ok := vars["job_id"]; ok {
1136 if _, ok := InfoJobs[id]; ok {
1137 log.Debug("Supervision ok, job", id)
1141 w.WriteHeader(http.StatusNotFound)
1144 // producer supervision
1145 func supervise_producer(w http.ResponseWriter, req *http.Request) {
1146 if req.Method != http.MethodGet {
1147 w.WriteHeader(http.StatusMethodNotAllowed)
1151 w.WriteHeader(http.StatusOK)
1154 // producer statistics, all jobs
1155 func statistics(w http.ResponseWriter, req *http.Request) {
1156 if req.Method != http.MethodGet {
1157 w.WriteHeader(http.StatusMethodNotAllowed)
1160 m := make(map[string]interface{})
1161 log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
1163 defer datalock.Unlock()
1164 req.Header.Set("Content-Type", "application/json; charset=utf-8")
1165 m["number-of-jobs"] = len(InfoJobs)
1166 m["number-of-types"] = len(InfoTypes.ProdDataTypes)
1167 qm := make(map[string]interface{})
1169 for key, elem := range InfoJobs {
1170 jm := make(map[string]interface{})
1172 jm["type"] = elem.job_info.InfoTypeIdentity
1173 typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
1174 jm["groupId"] = typeJob.groupId
1175 jm["clientID"] = typeJob.clientId
1176 jm["input topic"] = typeJob.InputTopic
1177 jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
1178 jm["output topic"] = elem.output_topic
1179 jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
1180 jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
1181 jm["msg_out (job)"] = elem.statistics.out_msg_cnt
1184 json, err := json.Marshal(m)
1186 w.WriteHeader(http.StatusInternalServerError)
1187 log.Error("Cannot marshal statistics json")
1190 _, err = w.Write(json)
1192 w.WriteHeader(http.StatusInternalServerError)
1193 log.Error("Cannot send statistics json")
1198 // Simple alive check
1199 func alive(w http.ResponseWriter, req *http.Request) {
1203 // Get/Set logging level
1204 func logging_level(w http.ResponseWriter, req *http.Request) {
1205 vars := mux.Vars(req)
1206 if level, ok := vars["level"]; ok {
1207 if req.Method == http.MethodPut {
1210 log.SetLevel(log.TraceLevel)
1212 log.SetLevel(log.DebugLevel)
1214 log.SetLevel(log.InfoLevel)
1216 log.SetLevel(log.WarnLevel)
1218 log.SetLevel(log.ErrorLevel)
1220 log.SetLevel(log.FatalLevel)
1222 log.SetLevel(log.PanicLevel)
1224 w.WriteHeader(http.StatusNotFound)
1227 w.WriteHeader(http.StatusMethodNotAllowed)
1230 if req.Method == http.MethodGet {
1232 if log.IsLevelEnabled(log.PanicLevel) {
1234 } else if log.IsLevelEnabled(log.FatalLevel) {
1236 } else if log.IsLevelEnabled(log.ErrorLevel) {
1238 } else if log.IsLevelEnabled(log.WarnLevel) {
1240 } else if log.IsLevelEnabled(log.InfoLevel) {
1242 } else if log.IsLevelEnabled(log.DebugLevel) {
1244 } else if log.IsLevelEnabled(log.TraceLevel) {
1247 w.Header().Set("Content-Type", "application/text")
1248 w.Write([]byte(msg))
1250 w.WriteHeader(http.StatusMethodNotAllowed)
1255 func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
1257 log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
1260 var c *kafka.Consumer = nil
1263 for topic_ok == false {
1266 case reader_ctrl := <-control_ch:
1267 if reader_ctrl.command == "EXIT" {
1268 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1269 data_ch <- nil //Signal to job handler
1273 case <-time.After(1 * time.Second):
1278 c = create_kafka_consumer(type_id, gid, cid)
1280 log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
1282 log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
1285 if c != nil && topic_ok == false {
1286 err := c.SubscribeTopics([]string{topic}, nil)
1288 log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
1290 log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
1296 log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
1298 var event_chan = make(chan int)
1302 case evt := <-c.Events():
1304 case kafka.OAuthBearerTokenRefresh:
1305 log.Debug("New consumer token needed: ", evt)
1306 token, err := fetch_token()
1308 log.Warning("Cannot cannot fetch token: ", err)
1309 c.SetOAuthBearerTokenFailure(err.Error())
1311 setTokenError := c.SetOAuthBearerToken(*token)
1312 if setTokenError != nil {
1313 log.Warning("Cannot cannot set token: ", setTokenError)
1314 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1318 log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
1321 case msg := <-event_chan:
1325 case <-time.After(1 * time.Second):
1337 case reader_ctrl := <-control_ch:
1338 if reader_ctrl.command == "EXIT" {
1340 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1341 data_ch <- nil //Signal to job handler
1349 log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
1352 switch e := ev.(type) {
1353 case *kafka.Message:
1354 var kmsg KafkaPayload
1361 log.Debug("Reader msg: ", &kmsg)
1362 log.Debug("Reader - data_ch ", data_ch)
1364 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
1366 case kafka.OAuthBearerTokenRefresh:
1367 log.Debug("New consumer token needed: ", ev)
1368 token, err := fetch_token()
1370 log.Warning("Cannot cannot fetch token: ", err)
1371 c.SetOAuthBearerTokenFailure(err.Error())
1373 setTokenError := c.SetOAuthBearerToken(*token)
1374 if setTokenError != nil {
1375 log.Warning("Cannot cannot set token: ", setTokenError)
1376 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1380 fmt.Printf("Ignored %v\n", e)
1388 func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
1390 var kafka_producer *kafka.Producer
1393 log.Info("Topic writer starting")
1395 // Wait for kafka producer to become available - and be prepared to exit the writer
1396 for kafka_producer == nil {
1398 case writer_ctl := <-control_ch:
1399 if writer_ctl.command == "EXIT" {
1403 kafka_producer = start_producer()
1404 if kafka_producer == nil {
1405 log.Debug("Could not start kafka producer - retrying")
1406 time.Sleep(1 * time.Second)
1408 log.Debug("Kafka producer started")
1413 var event_chan = make(chan int)
1417 case evt := <-kafka_producer.Events():
1419 case *kafka.Message:
1420 m := evt.(*kafka.Message)
1422 if m.TopicPartition.Error != nil {
1423 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
1425 log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
1428 log.Debug("Dumping topic writer event, error: ", evt)
1429 case kafka.OAuthBearerTokenRefresh:
1430 log.Debug("New producer token needed: ", evt)
1431 token, err := fetch_token()
1433 log.Warning("Cannot cannot fetch token: ", err)
1434 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
1436 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
1437 if setTokenError != nil {
1438 log.Warning("Cannot cannot set token: ", setTokenError)
1439 kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
1443 log.Debug("Dumping topic writer event, unknown: ", evt)
1446 case msg := <-event_chan:
1450 case <-time.After(1 * time.Second):
1460 case writer_ctl := <-control_ch:
1461 if writer_ctl.command == "EXIT" {
1462 // ignore - wait for channel signal
1465 case kmsg := <-data_ch:
1468 log.Info("Topic writer stopped by channel signal - start_topic_writer")
1469 defer kafka_producer.Close()
1476 for retry := 1; retry <= retries && msg_ok == false; retry++ {
1477 err = kafka_producer.Produce(&kafka.Message{
1478 TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
1479 Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
1482 incr_out_msg_cnt(kmsg.jobid)
1484 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
1486 log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
1487 time.Sleep(time.Duration(retry) * time.Second)
1491 log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
1493 case <-time.After(1000 * time.Millisecond):
1502 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
1503 var cm kafka.ConfigMap
1504 if creds_grant_type == "" {
1505 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1506 cm = kafka.ConfigMap{
1507 "bootstrap.servers": bootstrapserver,
1510 "auto.offset.reset": "latest",
1511 "enable.auto.commit": false,
1514 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1515 cm = kafka.ConfigMap{
1516 "bootstrap.servers": bootstrapserver,
1519 "auto.offset.reset": "latest",
1520 "enable.auto.commit": false,
1521 "sasl.mechanism": "OAUTHBEARER",
1522 "security.protocol": "SASL_PLAINTEXT",
1525 c, err := kafka.NewConsumer(&cm)
1528 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
1532 log.Info("Created kafka consumer for type: ", type_id, " OK")
1536 // Start kafka producer
1537 func start_producer() *kafka.Producer {
1538 log.Info("Creating kafka producer")
1540 var cm kafka.ConfigMap
1541 if creds_grant_type == "" {
1542 log.Info("Creating kafka SASL plain text producer")
1543 cm = kafka.ConfigMap{
1544 "bootstrap.servers": bootstrapserver,
1547 log.Info("Creating kafka SASL plain text producer")
1548 cm = kafka.ConfigMap{
1549 "bootstrap.servers": bootstrapserver,
1550 "sasl.mechanism": "OAUTHBEARER",
1551 "security.protocol": "SASL_PLAINTEXT",
1555 p, err := kafka.NewProducer(&cm)
1557 log.Error("Cannot create kafka producer,", err)
1563 func start_adminclient() *kafka.AdminClient {
1564 log.Info("Creating kafka admin client")
1565 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
1567 log.Error("Cannot create kafka admin client,", err)
1573 func create_minio_client(id string) (*minio.Client, *error) {
1574 log.Debug("Get minio client")
1575 minio_client, err := minio.New(filestore_server, &minio.Options{
1577 Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
1580 log.Error("Cannot create minio client, ", err)
1583 return minio_client, nil
1586 func incr_out_msg_cnt(jobid string) {
1587 j, ok := InfoJobs[jobid]
1589 j.statistics.out_msg_cnt++
1593 func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
1595 log.Info("Type job", type_id, " started")
1597 filters := make(map[string]Filter)
1598 topic_list := make(map[string]string)
1599 var mc *minio.Client
1600 const mc_id = "mc_" + "start_job_xml_file_data"
1604 case job_ctl := <-control_ch:
1605 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1606 switch job_ctl.command {
1608 //ignore cmd - handled by channel signal
1610 filters[job_ctl.filter.JobId] = job_ctl.filter
1611 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1613 tmp_topic_list := make(map[string]string)
1614 for k, v := range topic_list {
1615 tmp_topic_list[k] = v
1617 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1618 topic_list = tmp_topic_list
1619 case "REMOVE-FILTER":
1620 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1622 tmp_topic_list := make(map[string]string)
1623 for k, v := range topic_list {
1624 tmp_topic_list[k] = v
1626 delete(tmp_topic_list, job_ctl.filter.JobId)
1627 topic_list = tmp_topic_list
1630 case msg := <-data_in_ch:
1632 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
1637 if fsbucket != "" && fvolume == "" {
1640 mc, err = create_minio_client(mc_id)
1642 log.Debug("Cannot create minio client for type job: ", type_id)
1646 jobLimiterChan <- struct{}{}
1647 go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
1649 case <-time.After(1 * time.Second):
1657 func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
1663 if fvolume == "" && fsbucket == "" {
1664 log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
1666 } else if (fvolume != "") && (fsbucket != "") {
1667 log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
1672 var evt_data XmlFileEventHeader
1674 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
1676 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
1679 log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
1681 var reader io.Reader
1683 INPUTBUCKET := "ropfiles"
1687 filename = fvolume + "/" + evt_data.Name
1688 fi, err := os.Open(filename)
1691 log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
1697 filename = evt_data.Name
1699 tctx := context.Background()
1700 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
1702 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
1706 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err)
1712 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message")
1718 log.Error("Cannot get: ", filename, " - null reader")
1721 var file_bytes []byte
1722 if strings.HasSuffix(filename, "gz") {
1724 var buf3 bytes.Buffer
1725 errb := gunzipReaderToWriter(&buf3, reader)
1727 log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
1730 file_bytes = buf3.Bytes()
1731 log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
1734 var buf3 bytes.Buffer
1735 _, err2 := io.Copy(&buf3, reader)
1737 log.Error("File ", filename, " - cannot be read, discarding message, ", err)
1740 file_bytes = buf3.Bytes()
1743 b, err := xml_to_json_conv(&file_bytes, &evt_data)
1745 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
1748 log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
1750 new_fn := evt_data.Name + os.Getenv("KP") + ".json"
1751 if outputCompression == "gz" {
1752 new_fn = new_fn + ".gz"
1754 var buf bytes.Buffer
1755 err = gzipWrite(&buf, &b)
1757 log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
1761 log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
1768 err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
1770 log.Error("Cannot write file ", new_fn, " - discarding message,", err)
1773 log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
1774 } else if fsbucket != "" {
1776 objectName := new_fn
1779 contentType := "application/json"
1780 if strings.HasSuffix(objectName, ".gz") {
1781 contentType = "application/gzip"
1784 // Upload the xml file with PutObject
1785 r := bytes.NewReader(b)
1786 tctx := context.Background()
1787 if check_minio_bucket(mc, mc_id, fsbucket) == false {
1788 err := create_minio_bucket(mc, mc_id, fsbucket)
1790 log.Error("Cannot create bucket: ", fsbucket, ", ", err)
1795 for i := 1; i < 64 && ok == false; i = i * 2 {
1796 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
1800 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
1802 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
1804 time.Sleep(time.Duration(i) * time.Second)
1806 log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
1807 log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
1812 log.Error("Cannot upload : ", objectName, ", ", err)
1815 log.Error("Cannot upload: ", objectName, ", no client")
1821 var fde FileDownloadedEvt
1822 fde.Filename = new_fn
1823 j, err := jsoniter.Marshal(fde)
1826 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
1831 var fde FileDownloadedEvt
1832 fde.Filename = new_fn
1833 j, err := jsoniter.Marshal(fde)
1836 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
1841 msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
1842 log.Debug("Marshal file-collect event ", time.Since(start).String())
1844 for k, v := range topic_list {
1845 var kmsg *KafkaPayload = new(KafkaPayload)
1849 data_out_channel <- kmsg
1853 func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
1854 var f MeasCollecFile
1856 err := xml.Unmarshal(*f_byteValue, &f)
1858 return nil, errors.New("Cannot unmarshal xml-file")
1860 log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
1863 var pmfile PMJsonFile
1865 pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
1866 pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
1867 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
1868 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
1869 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
1871 for _, it := range f.MeasData.MeasInfo {
1872 var mili MeasInfoList
1873 mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
1874 for _, jt := range it.MeasType {
1875 mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
1877 for _, jt := range it.MeasValue {
1879 mv.MeasObjInstID = jt.MeasObjLdn
1880 mv.SuspectFlag = jt.Suspect
1881 if jt.Suspect == "" {
1882 mv.SuspectFlag = "false"
1884 for _, kt := range jt.R {
1885 ni, _ := strconv.Atoi(kt.P)
1887 mr := MeasResults{ni, nv}
1888 mv.MeasResultsList = append(mv.MeasResultsList, mr)
1890 mili.MeasValuesList = append(mili.MeasValuesList, mv)
1893 pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
1896 pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
1898 //TODO: Fill more values
1899 pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
1900 pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
1901 pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
1902 pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
1903 pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
1904 pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
1905 pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority
1906 pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
1907 pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
1908 pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version
1909 pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
1910 pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
1912 log.Debug("Convert xml to json : ", time.Since(start).String())
1915 json, err := jsoniter.Marshal(pmfile)
1916 log.Debug("Marshal json : ", time.Since(start).String())
1919 return nil, errors.New("Cannot marshal converted json")
1924 func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
1926 log.Info("Type job", type_id, " started")
1928 filters := make(map[string]Filter)
1929 filterParams_list := make(map[string]FilterMaps)
1930 topic_list := make(map[string]string)
1931 var mc *minio.Client
1932 const mc_id = "mc_" + "start_job_json_file_data"
1936 case job_ctl := <-control_ch:
1937 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1938 switch job_ctl.command {
1941 filters[job_ctl.filter.JobId] = job_ctl.filter
1942 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1944 tmp_filterParams_list := make(map[string]FilterMaps)
1945 for k, v := range filterParams_list {
1946 tmp_filterParams_list[k] = v
1948 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
1949 filterParams_list = tmp_filterParams_list
1951 tmp_topic_list := make(map[string]string)
1952 for k, v := range topic_list {
1953 tmp_topic_list[k] = v
1955 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1956 topic_list = tmp_topic_list
1957 case "REMOVE-FILTER":
1958 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1960 tmp_filterParams_list := make(map[string]FilterMaps)
1961 for k, v := range filterParams_list {
1962 tmp_filterParams_list[k] = v
1964 delete(tmp_filterParams_list, job_ctl.filter.JobId)
1965 filterParams_list = tmp_filterParams_list
1967 tmp_topic_list := make(map[string]string)
1968 for k, v := range topic_list {
1969 tmp_topic_list[k] = v
1971 delete(tmp_topic_list, job_ctl.filter.JobId)
1972 topic_list = tmp_topic_list
1975 case msg := <-data_in_ch:
1977 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
1985 mc, err = create_minio_client(mc_id)
1987 log.Debug("Cannot create minio client for type job: ", type_id)
1991 //TODO: Sort processed file conversions in order (FIFO)
1992 jobLimiterChan <- struct{}{}
1993 go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
1995 case <-time.After(1 * time.Second):
2003 func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2012 var evt_data FileDownloadedEvt
2013 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2015 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2018 log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2020 var reader io.Reader
2022 INPUTBUCKET := "pm-files-json"
2024 if objectstore == false {
2025 filename = files_volume + "/" + evt_data.Filename
2026 fi, err := os.Open(filename)
2029 log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2035 filename = "/" + evt_data.Filename
2037 if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2038 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2041 tctx := context.Background()
2042 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2044 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2050 log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2056 if strings.HasSuffix(filename, "gz") {
2058 var buf2 bytes.Buffer
2059 errb := gunzipReaderToWriter(&buf2, reader)
2061 log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2066 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2070 d, err := io.ReadAll(reader)
2072 log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2077 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2080 for k, v := range filterList {
2082 var pmfile PMJsonFile
2084 err = jsoniter.Unmarshal(*data, &pmfile)
2085 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2088 log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2092 var kmsg *KafkaPayload = new(KafkaPayload)
2093 kmsg.msg = new(kafka.Message)
2094 kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
2095 log.Debug("topic:", topic_list[k])
2096 log.Debug("sourceNameMap:", v.sourceNameMap)
2097 log.Debug("measObjClassMap:", v.measObjClassMap)
2098 log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
2099 log.Debug("measTypesMap:", v.measTypesMap)
2101 b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2103 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2108 kmsg.topic = topic_list[k]
2111 data_out_channel <- kmsg
2116 func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2118 if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
2122 j, err := jsoniter.Marshal(&data)
2124 log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2127 log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2131 log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2135 func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
2138 if len(sourceNameMap) != 0 {
2139 if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2146 var temp_mil []MeasInfoList
2147 for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2150 var cnt_flags []bool
2151 if len(measTypesMap) > 0 {
2153 var temp_mtl []string
2154 for _, v := range zz.MeasTypes.SMeasTypesList {
2155 if measTypesMap[v] {
2156 cnt_flags = append(cnt_flags, true)
2158 temp_mtl = append(temp_mtl, v)
2160 cnt_flags = append(cnt_flags, false)
2165 zz.MeasTypes.SMeasTypesList = temp_mtl
2172 var temp_mvl []MeasValues
2173 for _, yy := range zz.MeasValuesList {
2178 dna := strings.Split(yy.MeasObjInstID, ",")
2179 instName := dna[len(dna)-1]
2180 cls := strings.Split(dna[len(dna)-1], "=")[0]
2182 if len(measObjClassMap) > 0 {
2183 if measObjClassMap[cls] {
2190 if len(measObjInstIdsMap) > 0 {
2191 if measObjInstIdsMap[instName] {
2199 var temp_mrl []MeasResults
2201 for _, v := range yy.MeasResultsList {
2202 if cnt_flags[v.P-1] {
2205 temp_mrl = append(temp_mrl, v)
2208 yy.MeasResultsList = temp_mrl
2213 if keep_class && keep_cntr && keep_inst {
2215 temp_mvl = append(temp_mvl, yy)
2219 zz.MeasValuesList = temp_mvl
2220 temp_mil = append(temp_mil, zz)
2227 if len(temp_mil) == 0 {
2228 log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2231 data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2234 log.Debug("Filter: ", time.Since(start).String())
2238 func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
2240 log.Info("Type job", type_id, " started")
2241 log.Debug("influx job ch ", data_in_ch)
2242 filters := make(map[string]Filter)
2243 filterParams_list := make(map[string]FilterMaps)
2244 influx_job_params := make(map[string]InfluxJobParameters)
2245 var mc *minio.Client
2246 const mc_id = "mc_" + "start_job_json_file_data_influx"
2250 case job_ctl := <-control_ch:
2251 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2252 switch job_ctl.command {
2254 //ignore cmd - handled by channel signal
2257 filters[job_ctl.filter.JobId] = job_ctl.filter
2258 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2259 log.Debug(job_ctl.filter)
2260 tmp_filterParams_list := make(map[string]FilterMaps)
2261 for k, v := range filterParams_list {
2262 tmp_filterParams_list[k] = v
2264 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2265 filterParams_list = tmp_filterParams_list
2267 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2268 for k, v := range influx_job_params {
2269 tmp_influx_job_params[k] = v
2271 tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
2272 influx_job_params = tmp_influx_job_params
2274 case "REMOVE-FILTER":
2276 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2278 tmp_filterParams_list := make(map[string]FilterMaps)
2279 for k, v := range filterParams_list {
2280 tmp_filterParams_list[k] = v
2282 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2283 filterParams_list = tmp_filterParams_list
2285 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2286 for k, v := range influx_job_params {
2287 tmp_influx_job_params[k] = v
2289 delete(tmp_influx_job_params, job_ctl.filter.JobId)
2290 influx_job_params = tmp_influx_job_params
2293 case msg := <-data_in_ch:
2294 log.Debug("Data reveived - influx")
2296 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
2304 mc, err = create_minio_client(mc_id)
2306 log.Debug("Cannot create minio client for type job: ", type_id)
2311 jobLimiterChan <- struct{}{}
2312 go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
2314 case <-time.After(1 * time.Second):
2322 func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2324 log.Debug("run_json_file_data_job_influx")
2332 var evt_data FileDownloadedEvt
2333 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2335 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2338 log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2340 var reader io.Reader
2342 INPUTBUCKET := "pm-files-json"
2344 if objectstore == false {
2345 filename = files_volume + "/" + evt_data.Filename
2346 fi, err := os.Open(filename)
2349 log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2355 filename = "/" + evt_data.Filename
2357 if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2358 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2361 tctx := context.Background()
2362 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2364 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2370 log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2376 if strings.HasSuffix(filename, "gz") {
2378 var buf2 bytes.Buffer
2379 errb := gunzipReaderToWriter(&buf2, reader)
2381 log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2386 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2390 d, err := io.ReadAll(reader)
2392 log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2397 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2399 for k, v := range filterList {
2401 var pmfile PMJsonFile
2403 err = jsoniter.Unmarshal(*data, &pmfile)
2404 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2407 log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2411 if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2412 b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2414 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2419 fluxParms := influxList[k]
2420 log.Debug("Influxdb params: ", fluxParms)
2421 client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
2422 writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
2424 for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2425 ctr_names := make(map[string]string)
2426 for cni, cn := range zz.MeasTypes.SMeasTypesList {
2427 ctr_names[strconv.Itoa(cni+1)] = cn
2429 for _, xx := range zz.MeasValuesList {
2430 log.Debug("Measurement: ", xx.MeasObjInstID)
2431 log.Debug("Suspect flag: ", xx.SuspectFlag)
2432 p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
2433 p.AddField("suspectflag", xx.SuspectFlag)
2434 p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
2435 p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
2436 for _, yy := range xx.MeasResultsList {
2437 pi := strconv.Itoa(yy.P)
2440 log.Debug("Counter: ", pn, " Value: ", pv)
2441 pv_i, err := strconv.Atoi(pv)
2443 p.AddField(pn, pv_i)
2449 log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2450 log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2451 p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2452 err := writeAPI.WritePoint(context.Background(), p)
2454 log.Error("Db write error: ", err)