Documentation of RAN PM
[nonrtric/plt/ranpm.git] / pm-file-converter / main.go
1 //  ============LICENSE_START===============================================
2 //  Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 //  ========================================================================
4 //  Licensed under the Apache License, Version 2.0 (the "License");
5 //  you may not use this file except in compliance with the License.
6 //  You may obtain a copy of the License at
7 //
8 //       http://www.apache.org/licenses/LICENSE-2.0
9 //
10 //  Unless required by applicable law or agreed to in writing, software
11 //  distributed under the License is distributed on an "AS IS" BASIS,
12 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 //  See the License for the specific language governing permissions and
14 //  limitations under the License.
15 //  ============LICENSE_END=================================================
16 //
17
18 package main
19
20 import (
21         "bytes"
22         "compress/gzip"
23         "context"
24         "crypto/tls"
25         "encoding/json"
26         "encoding/xml"
27         "errors"
28         "fmt"
29         "io"
30         "net"
31         "os/signal"
32         "reflect"
33         "strings"
34         "sync"
35         "syscall"
36
37         "net/http"
38         "os"
39         "runtime"
40         "strconv"
41         "time"
42
43         "github.com/google/uuid"
44         "golang.org/x/oauth2/clientcredentials"
45
46         log "github.com/sirupsen/logrus"
47
48         "github.com/gorilla/mux"
49
50         "net/http/pprof"
51
52         "github.com/confluentinc/confluent-kafka-go/kafka"
53         influxdb2 "github.com/influxdata/influxdb-client-go/v2"
54         jsoniter "github.com/json-iterator/go"
55         "github.com/minio/minio-go/v7"
56         "github.com/minio/minio-go/v7/pkg/credentials"
57 )
58
59 //== Constants ==//
60
61 const http_port = 80
62 const https_port = 443
63 const config_file = "application_configuration.json"
64 const server_crt = "server.crt"
65 const server_key = "server.key"
66
67 const producer_name = "kafka-producer"
68
69 const registration_delay_short = 2
70 const registration_delay_long = 120
71
72 const mutexLocked = 1
73
74 const (
75         Init AppStates = iota
76         Running
77         Terminating
78 )
79
80 const reader_queue_length = 100 //Per type job
81 const writer_queue_length = 100 //Per info job
82 const parallelism_limiter = 100 //For all jobs
83
84 // This are optional - set if using SASL protocol is used towards kafka
85 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
86 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
87 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
88 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
89
90 //== Types ==//
91
92 type AppStates int64
93
94 type FilterParameters struct {
95         MeasuredEntityDns []string `json:"measuredEntityDns"`
96         MeasTypes         []string `json:"measTypes"`
97         MeasObjClass      []string `json:"measObjClass"`
98         MeasObjInstIds    []string `json:"measObjInstIds"`
99 }
100
101 type InfoJobDataType struct {
102         InfoJobData struct {
103                 KafkaOutputTopic string `json:"kafkaOutputTopic"`
104
105                 DbUrl    string `json:"db-url"`
106                 DbOrg    string `json:"db-org"`
107                 DbBucket string `json:"db-bucket"`
108                 DbToken  string `json:"db-token"`
109
110                 FilterParams FilterParameters `json:"filter"`
111         } `json:"info_job_data"`
112         InfoJobIdentity  string `json:"info_job_identity"`
113         InfoTypeIdentity string `json:"info_type_identity"`
114         LastUpdated      string `json:"last_updated"`
115         Owner            string `json:"owner"`
116         TargetURI        string `json:"target_uri"`
117 }
118
119 // Type for an infojob
120 type InfoJobRecord struct {
121         job_info     InfoJobDataType
122         output_topic string
123
124         statistics *InfoJobStats
125 }
126
127 // Type for an infojob
128 type TypeJobRecord struct {
129         InfoType        string
130         InputTopic      string
131         data_in_channel chan *KafkaPayload
132         reader_control  chan ReaderControl
133         job_control     chan JobControl
134         groupId         string
135         clientId        string
136
137         statistics *TypeJobStats
138 }
139
140 // Type for controlling the topic reader
141 type ReaderControl struct {
142         command string
143 }
144
145 // Type for controlling the topic writer
146 type WriterControl struct {
147         command string
148 }
149
150 // Type for controlling the job
151 type JobControl struct {
152         command string
153         filter  Filter
154 }
155
156 type KafkaPayload struct {
157         msg   *kafka.Message
158         topic string
159         jobid string
160 }
161
162 type FilterMaps struct {
163         sourceNameMap     map[string]bool
164         measObjClassMap   map[string]bool
165         measObjInstIdsMap map[string]bool
166         measTypesMap      map[string]bool
167 }
168
169 type InfluxJobParameters struct {
170         DbUrl    string
171         DbOrg    string
172         DbBucket string
173         DbToken  string
174 }
175
176 type Filter struct {
177         JobId       string
178         OutputTopic string
179         filter      FilterMaps
180
181         influxParameters InfluxJobParameters
182 }
183
184 // Type for info job statistics
185 type InfoJobStats struct {
186         out_msg_cnt  int
187         out_data_vol int64
188 }
189
190 // Type for type job statistics
191 type TypeJobStats struct {
192         in_msg_cnt  int
193         in_data_vol int64
194 }
195
196 // == API Datatypes ==//
197 // Type for supported data types
198 type DataType struct {
199         ID                 string `json:"id"`
200         KafkaInputTopic    string `json:"kafkaInputTopic"`
201         InputJobType       string `json:inputJobType`
202         InputJobDefinition struct {
203                 KafkaOutputTopic string `json:kafkaOutputTopic`
204         } `json:inputJobDefinition`
205
206         ext_job         *[]byte
207         ext_job_created bool
208         ext_job_id      string
209 }
210
211 type DataTypes struct {
212         ProdDataTypes []DataType `json:"types"`
213 }
214
215 type Minio_buckets struct {
216         Buckets map[string]bool
217 }
218
219 //== External data types ==//
220
221 // // Data type for event xml file download
222 type XmlFileEventHeader struct {
223         ProductName        string `json:"productName"`
224         VendorName         string `json:"vendorName"`
225         Location           string `json:"location"`
226         Compression        string `json:"compression"`
227         SourceName         string `json:"sourceName"`
228         FileFormatType     string `json:"fileFormatType"`
229         FileFormatVersion  string `json:"fileFormatVersion"`
230         StartEpochMicrosec int64  `json:"startEpochMicrosec"`
231         LastEpochMicrosec  int64  `json:"lastEpochMicrosec"`
232         Name               string `json:"name"`
233         ChangeIdentifier   string `json:"changeIdentifier"`
234         InternalLocation   string `json:"internalLocation"`
235         TimeZoneOffset     string `json:"timeZoneOffset"`
236         //ObjectStoreBucket  string `json:"objectStoreBucket"`
237 }
238
239 // Data types for input xml file
240 type MeasCollecFile struct {
241         XMLName        xml.Name `xml:"measCollecFile"`
242         Text           string   `xml:",chardata"`
243         Xmlns          string   `xml:"xmlns,attr"`
244         Xsi            string   `xml:"xsi,attr"`
245         SchemaLocation string   `xml:"schemaLocation,attr"`
246         FileHeader     struct {
247                 Text              string `xml:",chardata"`
248                 FileFormatVersion string `xml:"fileFormatVersion,attr"`
249                 VendorName        string `xml:"vendorName,attr"`
250                 DnPrefix          string `xml:"dnPrefix,attr"`
251                 FileSender        struct {
252                         Text        string `xml:",chardata"`
253                         LocalDn     string `xml:"localDn,attr"`
254                         ElementType string `xml:"elementType,attr"`
255                 } `xml:"fileSender"`
256                 MeasCollec struct {
257                         Text      string `xml:",chardata"`
258                         BeginTime string `xml:"beginTime,attr"`
259                 } `xml:"measCollec"`
260         } `xml:"fileHeader"`
261         MeasData struct {
262                 Text           string `xml:",chardata"`
263                 ManagedElement struct {
264                         Text      string `xml:",chardata"`
265                         LocalDn   string `xml:"localDn,attr"`
266                         SwVersion string `xml:"swVersion,attr"`
267                 } `xml:"managedElement"`
268                 MeasInfo []struct {
269                         Text       string `xml:",chardata"`
270                         MeasInfoId string `xml:"measInfoId,attr"`
271                         Job        struct {
272                                 Text  string `xml:",chardata"`
273                                 JobId string `xml:"jobId,attr"`
274                         } `xml:"job"`
275                         GranPeriod struct {
276                                 Text     string `xml:",chardata"`
277                                 Duration string `xml:"duration,attr"`
278                                 EndTime  string `xml:"endTime,attr"`
279                         } `xml:"granPeriod"`
280                         RepPeriod struct {
281                                 Text     string `xml:",chardata"`
282                                 Duration string `xml:"duration,attr"`
283                         } `xml:"repPeriod"`
284                         MeasType []struct {
285                                 Text string `xml:",chardata"`
286                                 P    string `xml:"p,attr"`
287                         } `xml:"measType"`
288                         MeasValue []struct {
289                                 Text       string `xml:",chardata"`
290                                 MeasObjLdn string `xml:"measObjLdn,attr"`
291                                 R          []struct {
292                                         Text string `xml:",chardata"`
293                                         P    string `xml:"p,attr"`
294                                 } `xml:"r"`
295                                 Suspect string `xml:"suspect"`
296                         } `xml:"measValue"`
297                 } `xml:"measInfo"`
298         } `xml:"measData"`
299         FileFooter struct {
300                 Text       string `xml:",chardata"`
301                 MeasCollec struct {
302                         Text    string `xml:",chardata"`
303                         EndTime string `xml:"endTime,attr"`
304                 } `xml:"measCollec"`
305         } `xml:"fileFooter"`
306 }
307
308 // Data type for json file
309 // Splitted in sevreal part to allow add/remove in lists
310 type MeasResults struct {
311         P      int    `json:"p"`
312         SValue string `json:"sValue"`
313 }
314
315 type MeasValues struct {
316         MeasObjInstID   string        `json:"measObjInstId"`
317         SuspectFlag     string        `json:"suspectFlag"`
318         MeasResultsList []MeasResults `json:"measResults"`
319 }
320
321 type SMeasTypes struct {
322         SMeasType string `json:"sMeasTypesList"`
323 }
324
325 type MeasInfoList struct {
326         MeasInfoID struct {
327                 SMeasInfoID string `json:"sMeasInfoId"`
328         } `json:"measInfoId"`
329         MeasTypes struct {
330                 SMeasTypesList []string `json:"sMeasTypesList"`
331         } `json:"measTypes"`
332         MeasValuesList []MeasValues `json:"measValuesList"`
333 }
334
335 type PMJsonFile struct {
336         Event struct {
337                 CommonEventHeader struct {
338                         Domain                  string `json:"domain"`
339                         EventID                 string `json:"eventId"`
340                         Sequence                int    `json:"sequence"`
341                         EventName               string `json:"eventName"`
342                         SourceName              string `json:"sourceName"`
343                         ReportingEntityName     string `json:"reportingEntityName"`
344                         Priority                string `json:"priority"`
345                         StartEpochMicrosec      int64  `json:"startEpochMicrosec"`
346                         LastEpochMicrosec       int64  `json:"lastEpochMicrosec"`
347                         Version                 string `json:"version"`
348                         VesEventListenerVersion string `json:"vesEventListenerVersion"`
349                         TimeZoneOffset          string `json:"timeZoneOffset"`
350                 } `json:"commonEventHeader"`
351                 Perf3GppFields struct {
352                         Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
353                         MeasDataCollection    struct {
354                                 GranularityPeriod             int            `json:"granularityPeriod"`
355                                 MeasuredEntityUserName        string         `json:"measuredEntityUserName"`
356                                 MeasuredEntityDn              string         `json:"measuredEntityDn"`
357                                 MeasuredEntitySoftwareVersion string         `json:"measuredEntitySoftwareVersion"`
358                                 SMeasInfoList                 []MeasInfoList `json:"measInfoList"`
359                         } `json:"measDataCollection"`
360                 } `json:"perf3gppFields"`
361         } `json:"event"`
362 }
363
364 // Data type for converted json file message
365 type FileDownloadedEvt struct {
366         Filename string `json:"filename"`
367 }
368
369 //== Variables ==//
370
371 var AppState = Init
372
373 // Lock for all internal data
374 var datalock sync.Mutex
375
376 var producer_instance_name string = producer_name
377
378 // Keep all info type jobs, key == type id
379 var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
380
381 // Keep all info jobs, key == job id
382 var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
383
384 var InfoTypes DataTypes
385
386 // Limiter - valid for all jobs
387 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
388
389 // TODO: Config param?
390 var bucket_location = "swe"
391
392 var httpclient = &http.Client{}
393
394 // == Env variables ==//
395 var bootstrapserver = os.Getenv("KAFKA_SERVER")
396 var files_volume = os.Getenv("FILES_VOLUME")
397 var ics_server = os.Getenv("ICS")
398 var self = os.Getenv("SELF")
399 var filestore_user = os.Getenv("FILESTORE_USER")
400 var filestore_pwd = os.Getenv("FILESTORE_PWD")
401 var filestore_server = os.Getenv("FILESTORE_SERVER")
402
403 var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
404 var writer_control = make(chan WriterControl, 1)
405
406 var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
407
408 // == Main ==//
409 func main() {
410
411         //log.SetLevel(log.InfoLevel)
412         log.SetLevel(log.TraceLevel)
413
414         log.Info("Server starting...")
415
416         if self == "" {
417                 log.Panic("Env SELF not configured")
418         }
419         if bootstrapserver == "" {
420                 log.Panic("Env KAFKA_SERVER not set")
421         }
422         if ics_server == "" {
423                 log.Panic("Env ICS not set")
424         }
425         if os.Getenv("KP") != "" {
426                 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
427         }
428
429         rtr := mux.NewRouter()
430         rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
431         rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
432         rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
433         rtr.HandleFunc("/statistics", statistics)
434         rtr.HandleFunc("/logging/{level}", logging_level)
435         rtr.HandleFunc("/logging", logging_level)
436         rtr.HandleFunc("/", alive)
437
438         //For perf/mem profiling
439         rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
440
441         http.Handle("/", rtr)
442
443         http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
444
445         cer, err := tls.LoadX509KeyPair(server_crt, server_key)
446         if err != nil {
447                 log.Error("Cannot load key and cert - %v\n", err)
448                 return
449         }
450         config := &tls.Config{Certificates: []tls.Certificate{cer}}
451         https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
452
453         //TODO: Make http on/off configurable
454         // Run http
455         go func() {
456                 log.Info("Starting http service...")
457                 err := http_server.ListenAndServe()
458                 if err == http.ErrServerClosed { // graceful shutdown
459                         log.Info("http server shutdown...")
460                 } else if err != nil {
461                         log.Error("http server error: %v\n", err)
462                 }
463         }()
464
465         //TODO: Make https on/off configurable
466         //  Run https
467         go func() {
468                 log.Info("Starting https service...")
469                 err := https_server.ListenAndServe()
470                 if err == http.ErrServerClosed { // graceful shutdown
471                         log.Info("https server shutdown...")
472                 } else if err != nil {
473                         log.Error("https server error: %v\n", err)
474                 }
475         }()
476         check_tcp(strconv.Itoa(http_port))
477         check_tcp(strconv.Itoa(https_port))
478
479         go start_topic_writer(writer_control, data_out_channel)
480
481         //Setup proc for periodic type registration
482         var event_chan = make(chan int) //Channel for stopping the proc
483         go periodic_registration(event_chan)
484
485         //Wait for term/int signal do try to shut down gracefully
486         sigs := make(chan os.Signal, 1)
487         signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
488         go func() {
489                 sig := <-sigs
490                 fmt.Printf("Received signal %s - application will terminate\n", sig)
491                 event_chan <- 0 // Stop periodic registration
492                 datalock.Lock()
493                 defer datalock.Unlock()
494                 AppState = Terminating
495                 http_server.Shutdown(context.Background())
496                 https_server.Shutdown(context.Background())
497                 // Stopping jobs
498                 for key, _ := range TypeJobs {
499                         log.Info("Stopping type job:", key)
500                         for _, dp := range InfoTypes.ProdDataTypes {
501                                 if key == dp.ID {
502                                         remove_type_job(dp)
503                                 }
504                         }
505                 }
506         }()
507
508         AppState = Running
509
510         //Wait until all go routines has exited
511         runtime.Goexit()
512
513         fmt.Println("main routine exit")
514         fmt.Println("server stopped")
515 }
516
517 func check_tcp(port string) {
518         log.Info("Checking tcp port: ", port)
519         for true {
520                 address := net.JoinHostPort("localhost", port)
521                 // 3 second timeout
522                 conn, err := net.DialTimeout("tcp", address, 3*time.Second)
523                 if err != nil {
524                         log.Info("Checking tcp port: ", port, " failed, retrying...")
525                 } else {
526                         if conn != nil {
527                                 log.Info("Checking tcp port: ", port, " - OK")
528                                 _ = conn.Close()
529                                 return
530                         } else {
531                                 log.Info("Checking tcp port: ", port, " failed, retrying...")
532                         }
533                 }
534         }
535 }
536
537 //== Core functions ==//
538
539 // Run periodic registration of producers
540 func periodic_registration(evtch chan int) {
541         var delay int = 1
542         for {
543                 select {
544                 case msg := <-evtch:
545                         if msg == 0 { // Stop thread
546                                 return
547                         }
548                 case <-time.After(time.Duration(delay) * time.Second):
549                         ok := register_producer()
550                         if ok {
551                                 delay = registration_delay_long
552                         } else {
553                                 if delay < registration_delay_long {
554                                         delay += registration_delay_short
555                                 } else {
556                                         delay = registration_delay_short
557                                 }
558                         }
559                 }
560         }
561 }
562
563 func register_producer() bool {
564
565         log.Info("Registering producer: ", producer_instance_name)
566
567         file, err := os.ReadFile(config_file)
568         if err != nil {
569                 log.Error("Cannot read config file: ", config_file)
570                 log.Error("Registering producer: ", producer_instance_name, " - failed")
571                 return false
572         }
573         data := DataTypes{}
574         err = jsoniter.Unmarshal([]byte(file), &data)
575         if err != nil {
576                 log.Error("Cannot parse config file: ", config_file)
577                 log.Error("Registering producer: ", producer_instance_name, " - failed")
578                 return false
579         }
580         var new_type_names []string
581
582         for i := 0; i < len(data.ProdDataTypes); i++ {
583                 t1 := make(map[string]interface{})
584                 t2 := make(map[string]interface{})
585
586                 t2["schema"] = "http://json-schema.org/draft-07/schema#"
587                 t2["title"] = data.ProdDataTypes[i].ID
588                 t2["description"] = data.ProdDataTypes[i].ID
589                 t2["type"] = "object"
590
591                 t1["info_job_data_schema"] = t2
592
593                 json, err := json.Marshal(t1)
594                 if err != nil {
595                         log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
596                         log.Error("Registering producer: ", producer_instance_name, " - failed")
597                         return false
598                 } else {
599                         //TODO: http/https should be configurable
600                         ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
601                         if !ok {
602                                 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
603                                 log.Error("Registering producer: ", producer_instance_name, " - failed")
604                                 return false
605                         }
606                         new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
607                 }
608
609         }
610
611         log.Debug("Registering types: ", new_type_names)
612         m := make(map[string]interface{})
613         m["supported_info_types"] = new_type_names
614         //TODO: http/https should be configurable
615         m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
616         //TODO: http/https should be configurable
617         m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
618
619         json, err := json.Marshal(m)
620         if err != nil {
621                 log.Error("Cannot create json for producer: ", producer_instance_name)
622                 log.Error("Registering producer: ", producer_instance_name, " - failed")
623                 return false
624         }
625         ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
626         if !ok {
627                 log.Error("Cannot register producer: ", producer_instance_name)
628                 log.Error("Registering producer: ", producer_instance_name, " - failed")
629                 return false
630         }
631         datalock.Lock()
632         defer datalock.Unlock()
633
634         var current_type_names []string
635         for _, v := range InfoTypes.ProdDataTypes {
636                 current_type_names = append(current_type_names, v.ID)
637                 if contains_str(new_type_names, v.ID) {
638                         //Type exist
639                         log.Debug("Type ", v.ID, " exists")
640                         create_ext_job(v)
641                 } else {
642                         //Type is removed
643                         log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
644                         remove_type_job(v)
645                 }
646         }
647
648         for _, v := range data.ProdDataTypes {
649                 if contains_str(current_type_names, v.ID) {
650                         //Type exist
651                         log.Debug("Type ", v.ID, " exists")
652                         create_ext_job(v)
653                 } else {
654                         //Type is new
655                         log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
656                         start_type_job(v)
657                 }
658         }
659
660         InfoTypes = data
661         log.Debug("Datatypes: ", InfoTypes)
662
663         log.Info("Registering producer: ", producer_instance_name, " - OK")
664         return true
665 }
666
667 func remove_type_job(dp DataType) {
668         log.Info("Removing type job: ", dp.ID)
669         j, ok := TypeJobs[dp.ID]
670         if ok {
671                 j.reader_control <- ReaderControl{"EXIT"}
672         }
673
674         if dp.ext_job_created == true {
675                 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
676                 //TODO: http/https should be configurable
677                 ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
678                 if !ok {
679                         log.Error("Cannot delete job: ", dp.ext_job_id)
680                 }
681                 dp.ext_job_created = false
682                 dp.ext_job = nil
683         }
684
685 }
686
687 func start_type_job(dp DataType) {
688         log.Info("Starting type job: ", dp.ID)
689         job_record := TypeJobRecord{}
690
691         job_record.job_control = make(chan JobControl, 1)
692         job_record.reader_control = make(chan ReaderControl, 1)
693         job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
694         job_record.InfoType = dp.ID
695         job_record.InputTopic = dp.KafkaInputTopic
696         job_record.groupId = "kafka-procon-" + dp.ID
697         job_record.clientId = dp.ID + "-" + os.Getenv("KP")
698         var stats TypeJobStats
699         job_record.statistics = &stats
700
701         switch dp.ID {
702         case "xml-file-data-to-filestore":
703                 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
704         case "xml-file-data":
705                 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
706         case "json-file-data-from-filestore":
707                 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
708         case "json-file-data":
709                 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
710         case "json-file-data-from-filestore-to-influx":
711                 go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
712         // case "json-data-to-influx":
713         //      go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel)
714
715         default:
716         }
717
718         go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
719
720         TypeJobs[dp.ID] = job_record
721         log.Debug("Type job input type: ", dp.InputJobType)
722         create_ext_job(dp)
723 }
724
725 func create_ext_job(dp DataType) {
726         if dp.InputJobType != "" {
727                 jb := make(map[string]interface{})
728                 jb["info_type_id"] = dp.InputJobType
729                 jb["job_owner"] = "console" //TODO:
730                 jb["status_notification_uri"] = "http://callback:80/post"
731                 jb1 := make(map[string]interface{})
732                 jb["job_definition"] = jb1
733                 jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
734
735                 json, err := json.Marshal(jb)
736                 dp.ext_job_created = false
737                 dp.ext_job = nil
738                 if err != nil {
739                         log.Error("Cannot create json for type: ", dp.InputJobType)
740                         return
741                 }
742
743                 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
744                 //TODO: http/https should be configurable
745                 ok := false
746                 for !ok {
747                         ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
748                         if !ok {
749                                 log.Error("Cannot register job: ", dp.InputJobType)
750                                 //TODO: Restart after long time?
751                         }
752                 }
753                 log.Debug("Registered job ok: ", dp.InputJobType)
754                 dp.ext_job_created = true
755                 dp.ext_job = &json
756         }
757 }
758
759 func remove_info_job(jobid string) {
760         log.Info("Removing info job: ", jobid)
761         filter := Filter{}
762         filter.JobId = jobid
763         jc := JobControl{}
764         jc.command = "REMOVE-FILTER"
765         jc.filter = filter
766         infoJob := InfoJobs[jobid]
767         typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
768         typeJob.job_control <- jc
769         delete(InfoJobs, jobid)
770
771 }
772
773 // == Helper functions ==//
774
775 // Function to check the status of a mutex lock
776 func MutexLocked(m *sync.Mutex) bool {
777         state := reflect.ValueOf(m).Elem().FieldByName("state")
778         return state.Int()&mutexLocked == mutexLocked
779 }
780
781 // Test if slice contains a string
782 func contains_str(s []string, e string) bool {
783         for _, a := range s {
784                 if a == e {
785                         return true
786                 }
787         }
788         return false
789 }
790
791 // Send a http request with json (json may be nil)
792 func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
793
794         // set the HTTP method, url, and request body
795         var req *http.Request
796         var err error
797         if json == nil {
798                 req, err = http.NewRequest(method, url, http.NoBody)
799         } else {
800                 req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
801                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
802         }
803         if err != nil {
804                 log.Error("Cannot create http request, method: ", method, " url: ", url)
805                 return false
806         }
807
808         if useAuth {
809                 token, err := fetch_token()
810                 if err != nil {
811                         log.Error("Cannot fetch token for http request: ", err)
812                         return false
813                 }
814                 req.Header.Set("Authorization", "Bearer "+token.TokenValue)
815         }
816
817         log.Debug("HTTP request: ", req)
818
819         log.Debug("Sending http request")
820         resp, err2 := httpclient.Do(req)
821         if err2 != nil {
822                 log.Error("Http request error: ", err2)
823                 log.Error("Cannot send http request method: ", method, " url: ", url)
824         } else {
825                 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
826                         log.Debug("Accepted http status: ", resp.StatusCode)
827                         resp.Body.Close()
828                         return true
829                 }
830                 log.Debug("HTTP resp: ", resp)
831                 resp.Body.Close()
832         }
833         return false
834 }
835
836 // // Send a http request with json (json may be nil)
837 // func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
838
839 //      // set the HTTP method, url, and request body
840 //      var req *http.Request
841 //      var err error
842 //      if json == nil {
843 //              req, err = http.NewRequest(method, url, http.NoBody)
844 //      } else {
845 //              req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
846 //              req.Header.Set("Content-Type", "application/json; charset=utf-8")
847 //      }
848 //      if err != nil {
849 //              log.Error("Cannot create http request, method: ", method, " url: ", url)
850 //              return false
851 //      }
852
853 //      if useAuth {
854 //              token, err := fetch_token()
855 //              if err != nil {
856 //                      log.Error("Cannot fetch token for http request: ", err)
857 //                      return false
858 //              }
859 //              req.Header.Set("Authorization", "Bearer "+token.TokenValue)
860 //      }
861
862 //      log.Debug("HTTP request: ", req)
863
864 //      retries := 1
865 //      if retry {
866 //              retries = 5
867 //      }
868 //      sleep_time := 1
869 //      for i := retries; i > 0; i-- {
870 //              log.Debug("Sending http request")
871 //              resp, err2 := httpclient.Do(req)
872 //              if err2 != nil {
873 //                      log.Error("Http request error: ", err2)
874 //                      log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1)
875
876 //                      time.Sleep(time.Duration(sleep_time) * time.Second)
877 //                      sleep_time = 2 * sleep_time
878 //              } else {
879 //                      if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
880 //                              log.Debug("Accepted http status: ", resp.StatusCode)
881 //                              resp.Body.Close()
882 //                              return true
883 //                      }
884 //                      log.Debug("HTTP resp: ", resp)
885 //                      resp.Body.Close()
886 //              }
887 //      }
888 //      return false
889 // }
890
891 // // Send a http request with json (json may be nil)
892 // func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
893 //      // initialize http client
894 //      client := &http.Client{}
895
896 //      // set the HTTP method, url, and request body
897 //      var req *http.Request
898 //      var err error
899 //      if json == nil {
900 //              req, err = http.NewRequest(method, url, http.NoBody)
901 //      } else {
902 //              req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
903 //              req.Header.Set("Content-Type", "application/json; charset=utf-8")
904 //      }
905 //      if err != nil {
906 //              log.Error("Cannot create http request method: ", method, " url: ", url)
907 //              return false
908 //      }
909
910 //      useAuth = false
911 //      if useAuth {
912 //              token, err := fetch_token()
913 //              if err != nil {
914 //                      log.Error("Cannot fetch token for http request: ", err)
915 //                      return false
916 //              }
917 //              req.Header.Add("Authorization", "Bearer "+token.TokenValue)
918 //      }
919 //      log.Debug("HTTP request: ", req)
920
921 //      b, berr := io.ReadAll(req.Body)
922 //      if berr == nil {
923 //              log.Debug("HTTP request body length: ", len(b))
924 //      } else {
925 //              log.Debug("HTTP request - cannot check body length: ", berr)
926 //      }
927 //      if json == nil {
928 //              log.Debug("HTTP request null json")
929 //      } else {
930 //              log.Debug("HTTP request json: ", string(json))
931 //      }
932 //      requestDump, cerr := httputil.DumpRequestOut(req, true)
933 //      if cerr != nil {
934 //              fmt.Println(cerr)
935 //      }
936 //      fmt.Println(string(requestDump))
937
938 //      retries := 1
939 //      if retry {
940 //              retries = 5
941 //      }
942 //      sleep_time := 1
943 //      for i := retries; i > 0; i-- {
944 //              resp, err2 := client.Do(req)
945 //              if err2 != nil {
946 //                      log.Error("Http request error: ", err2)
947 //                      log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i)
948
949 //                      time.Sleep(time.Duration(sleep_time) * time.Second)
950 //                      sleep_time = 2 * sleep_time
951 //              } else {
952 //                      if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
953 //                              log.Debug("Accepted http status: ", resp.StatusCode)
954 //                              defer resp.Body.Close()
955 //                              return true
956 //                      }
957 //              }
958 //      }
959 //      return false
960 // }
961
962 func fetch_token() (*kafka.OAuthBearerToken, error) {
963         log.Debug("Get token inline")
964         conf := &clientcredentials.Config{
965                 ClientID:     creds_client_id,
966                 ClientSecret: creds_client_secret,
967                 TokenURL:     creds_service_url,
968         }
969         token, err := conf.Token(context.Background())
970         if err != nil {
971                 log.Warning("Cannot fetch access token: ", err)
972                 return nil, err
973         }
974         extensions := map[string]string{}
975         log.Debug("=====================================================")
976         log.Debug("token: ", token)
977         log.Debug("=====================================================")
978         log.Debug("TokenValue: ", token.AccessToken)
979         log.Debug("=====================================================")
980         log.Debug("Expiration: ", token.Expiry)
981         t := token.Expiry
982         // t := token.Expiry.Add(-time.Minute)
983         // log.Debug("Modified expiration: ", t)
984         oauthBearerToken := kafka.OAuthBearerToken{
985                 TokenValue: token.AccessToken,
986                 Expiration: t,
987                 Extensions: extensions,
988         }
989
990         return &oauthBearerToken, nil
991 }
992
993 // Function to print memory details
994 // https://pkg.go.dev/runtime#MemStats
995 func PrintMemUsage() {
996         if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
997                 var m runtime.MemStats
998                 runtime.ReadMemStats(&m)
999                 fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
1000                 fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
1001                 fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
1002                 fmt.Printf("\tNumGC = %v\n", m.NumGC)
1003                 fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
1004                 fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
1005                 fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
1006                 fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
1007                 fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
1008                 fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
1009                 fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
1010         }
1011 }
1012
1013 func bToMb(b uint64) uint64 {
1014         return b / 1024 / 1024
1015 }
1016
1017 func generate_uuid_from_type(s string) string {
1018         if len(s) > 16 {
1019                 s = s[:16]
1020         }
1021         for len(s) < 16 {
1022                 s = s + "0"
1023         }
1024         b := []byte(s)
1025         b = b[:16]
1026         uuid, _ := uuid.FromBytes(b)
1027         return uuid.String()
1028 }
1029
1030 // Write gzipped data to a Writer
1031 func gzipWrite(w io.Writer, data *[]byte) error {
1032         gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
1033
1034         if err1 != nil {
1035                 return err1
1036         }
1037         defer gw.Close()
1038         _, err2 := gw.Write(*data)
1039         return err2
1040 }
1041
1042 // Write gunzipped data from Reader to a Writer
1043 func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
1044         gr, err1 := gzip.NewReader(data)
1045
1046         if err1 != nil {
1047                 return err1
1048         }
1049         defer gr.Close()
1050         data2, err2 := io.ReadAll(gr)
1051         if err2 != nil {
1052                 return err2
1053         }
1054         _, err3 := w.Write(data2)
1055         if err3 != nil {
1056                 return err3
1057         }
1058         return nil
1059 }
1060
1061 func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
1062         tctx := context.Background()
1063         err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
1064         if err != nil {
1065                 // Check to see if we already own this bucket (which happens if you run this twice)
1066                 exists, errBucketExists := mc.BucketExists(tctx, bucket)
1067                 if errBucketExists == nil && exists {
1068                         log.Debug("Already own bucket:", bucket)
1069                         add_bucket(client_id, bucket)
1070                         return nil
1071                 } else {
1072                         log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
1073                         return err
1074                 }
1075         }
1076         log.Debug("Successfully created bucket: ", bucket)
1077         add_bucket(client_id, bucket)
1078         return nil
1079 }
1080
1081 func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
1082         ok := bucket_exist(client_id, bucket)
1083         if ok {
1084                 return true
1085         }
1086         tctx := context.Background()
1087         exists, err := mc.BucketExists(tctx, bucket)
1088         if err == nil && exists {
1089                 log.Debug("Already own bucket:", bucket)
1090                 return true
1091         }
1092         log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
1093         return false
1094 }
1095
1096 func add_bucket(minio_id string, bucket string) {
1097         datalock.Lock()
1098         defer datalock.Unlock()
1099
1100         b, ok := minio_bucketlist[minio_id]
1101         if !ok {
1102                 b = Minio_buckets{}
1103                 b.Buckets = make(map[string]bool)
1104         }
1105         b.Buckets[bucket] = true
1106         minio_bucketlist[minio_id] = b
1107 }
1108
1109 func bucket_exist(minio_id string, bucket string) bool {
1110         datalock.Lock()
1111         defer datalock.Unlock()
1112
1113         b, ok := minio_bucketlist[minio_id]
1114         if !ok {
1115                 return false
1116         }
1117         _, ok = b.Buckets[bucket]
1118         return ok
1119 }
1120
1121 //== http api functions ==//
1122
1123 // create/update job
1124 func create_job(w http.ResponseWriter, req *http.Request) {
1125         log.Debug("Create job, http method: ", req.Method)
1126         if req.Method != http.MethodPost {
1127                 log.Error("Create job, http method not allowed")
1128                 w.WriteHeader(http.StatusMethodNotAllowed)
1129                 return
1130         }
1131         ct := req.Header.Get("Content-Type")
1132         if ct != "application/json" {
1133                 log.Error("Create job, bad content type")
1134                 http.Error(w, "Bad content type", http.StatusBadRequest)
1135                 return
1136         }
1137
1138         var t InfoJobDataType
1139         err := json.NewDecoder(req.Body).Decode(&t)
1140         if err != nil {
1141                 log.Error("Create job, cannot parse json,", err)
1142                 http.Error(w, "Cannot parse json", http.StatusBadRequest)
1143                 return
1144         }
1145         log.Debug("Creating job, id: ", t.InfoJobIdentity)
1146         datalock.Lock()
1147         defer datalock.Unlock()
1148
1149         job_id := t.InfoJobIdentity
1150         job_record, job_found := InfoJobs[job_id]
1151         type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
1152         if !job_found {
1153                 if !found_type {
1154                         log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1155                         http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1156                         return
1157                 }
1158         } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
1159                 log.Error("Job cannot change type")
1160                 http.Error(w, "Job cannot change type", http.StatusBadRequest)
1161                 return
1162         } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
1163                 log.Error("Job cannot change topic")
1164                 http.Error(w, "Job cannot change topic", http.StatusBadRequest)
1165                 return
1166         } else if !found_type {
1167                 //Should never happen, if the type is removed then job is stopped
1168                 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1169                 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1170                 return
1171         }
1172
1173         //TODO: Verify that job contains enough parameters...
1174
1175         if !job_found {
1176                 job_record = InfoJobRecord{}
1177                 job_record.job_info = t
1178                 output_topic := t.InfoJobData.KafkaOutputTopic
1179                 job_record.output_topic = t.InfoJobData.KafkaOutputTopic
1180                 log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
1181
1182                 var stats InfoJobStats
1183                 job_record.statistics = &stats
1184
1185                 filter := Filter{}
1186                 filter.JobId = job_id
1187                 filter.OutputTopic = job_record.output_topic
1188
1189                 jc := JobControl{}
1190
1191                 jc.command = "ADD-FILTER"
1192
1193                 //TODO: Refactor
1194                 if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1195                         fm := FilterMaps{}
1196                         fm.sourceNameMap = make(map[string]bool)
1197                         fm.measObjClassMap = make(map[string]bool)
1198                         fm.measObjInstIdsMap = make(map[string]bool)
1199                         fm.measTypesMap = make(map[string]bool)
1200                         if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
1201                                 for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
1202                                         fm.sourceNameMap[v] = true
1203                                 }
1204                         }
1205                         if t.InfoJobData.FilterParams.MeasObjClass != nil {
1206                                 for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
1207                                         fm.measObjClassMap[v] = true
1208                                 }
1209                         }
1210                         if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
1211                                 for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
1212                                         fm.measObjInstIdsMap[v] = true
1213                                 }
1214                         }
1215                         if t.InfoJobData.FilterParams.MeasTypes != nil {
1216                                 for _, v := range t.InfoJobData.FilterParams.MeasTypes {
1217                                         fm.measTypesMap[v] = true
1218                                 }
1219                         }
1220                         filter.filter = fm
1221                 }
1222                 if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1223                         influxparam := InfluxJobParameters{}
1224                         influxparam.DbUrl = t.InfoJobData.DbUrl
1225                         influxparam.DbOrg = t.InfoJobData.DbOrg
1226                         influxparam.DbBucket = t.InfoJobData.DbBucket
1227                         influxparam.DbToken = t.InfoJobData.DbToken
1228                         filter.influxParameters = influxparam
1229                 }
1230
1231                 jc.filter = filter
1232                 InfoJobs[job_id] = job_record
1233
1234                 type_job_record.job_control <- jc
1235
1236         } else {
1237                 //TODO
1238                 //Update job
1239         }
1240 }
1241
1242 // delete job
1243 func delete_job(w http.ResponseWriter, req *http.Request) {
1244         if req.Method != http.MethodDelete {
1245                 w.WriteHeader(http.StatusMethodNotAllowed)
1246                 return
1247         }
1248         datalock.Lock()
1249         defer datalock.Unlock()
1250
1251         vars := mux.Vars(req)
1252
1253         if id, ok := vars["job_id"]; ok {
1254                 if _, ok := InfoJobs[id]; ok {
1255                         remove_info_job(id)
1256                         w.WriteHeader(http.StatusNoContent)
1257                         log.Info("Job ", id, " deleted")
1258                         return
1259                 }
1260         }
1261         w.WriteHeader(http.StatusNotFound)
1262 }
1263
1264 // job supervision
1265 func supervise_job(w http.ResponseWriter, req *http.Request) {
1266         if req.Method != http.MethodGet {
1267                 w.WriteHeader(http.StatusMethodNotAllowed)
1268                 return
1269         }
1270         datalock.Lock()
1271         defer datalock.Unlock()
1272
1273         vars := mux.Vars(req)
1274
1275         log.Debug("Supervising, job: ", vars["job_id"])
1276         if id, ok := vars["job_id"]; ok {
1277                 if _, ok := InfoJobs[id]; ok {
1278                         log.Debug("Supervision ok, job", id)
1279                         return
1280                 }
1281         }
1282         w.WriteHeader(http.StatusNotFound)
1283 }
1284
1285 // producer supervision
1286 func supervise_producer(w http.ResponseWriter, req *http.Request) {
1287         if req.Method != http.MethodGet {
1288                 w.WriteHeader(http.StatusMethodNotAllowed)
1289                 return
1290         }
1291
1292         w.WriteHeader(http.StatusOK)
1293 }
1294
1295 // producer statictics, all jobs
1296 func statistics(w http.ResponseWriter, req *http.Request) {
1297         if req.Method != http.MethodGet {
1298                 w.WriteHeader(http.StatusMethodNotAllowed)
1299                 return
1300         }
1301         m := make(map[string]interface{})
1302         log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
1303         datalock.Lock()
1304         defer datalock.Unlock()
1305         req.Header.Set("Content-Type", "application/json; charset=utf-8")
1306         m["number-of-jobs"] = len(InfoJobs)
1307         m["number-of-types"] = len(InfoTypes.ProdDataTypes)
1308         qm := make(map[string]interface{})
1309         m["jobs"] = qm
1310         for key, elem := range InfoJobs {
1311                 jm := make(map[string]interface{})
1312                 qm[key] = jm
1313                 jm["type"] = elem.job_info.InfoTypeIdentity
1314                 typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
1315                 jm["groupId"] = typeJob.groupId
1316                 jm["clientID"] = typeJob.clientId
1317                 jm["input topic"] = typeJob.InputTopic
1318                 jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
1319                 jm["output topic"] = elem.output_topic
1320                 jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
1321                 jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
1322                 jm["msg_out (job)"] = elem.statistics.out_msg_cnt
1323
1324         }
1325         json, err := json.Marshal(m)
1326         if err != nil {
1327                 w.WriteHeader(http.StatusInternalServerError)
1328                 log.Error("Cannot marshal statistics json")
1329                 return
1330         }
1331         _, err = w.Write(json)
1332         if err != nil {
1333                 w.WriteHeader(http.StatusInternalServerError)
1334                 log.Error("Cannot send statistics json")
1335                 return
1336         }
1337 }
1338
1339 // Simple alive check
1340 func alive(w http.ResponseWriter, req *http.Request) {
1341         //Alive check
1342 }
1343
1344 // Get/Set logging level
1345 func logging_level(w http.ResponseWriter, req *http.Request) {
1346         vars := mux.Vars(req)
1347         if level, ok := vars["level"]; ok {
1348                 if req.Method == http.MethodPut {
1349                         switch level {
1350                         case "trace":
1351                                 log.SetLevel(log.TraceLevel)
1352                         case "debug":
1353                                 log.SetLevel(log.DebugLevel)
1354                         case "info":
1355                                 log.SetLevel(log.InfoLevel)
1356                         case "warn":
1357                                 log.SetLevel(log.WarnLevel)
1358                         case "error":
1359                                 log.SetLevel(log.ErrorLevel)
1360                         case "fatal":
1361                                 log.SetLevel(log.FatalLevel)
1362                         case "panic":
1363                                 log.SetLevel(log.PanicLevel)
1364                         default:
1365                                 w.WriteHeader(http.StatusNotFound)
1366                         }
1367                 } else {
1368                         w.WriteHeader(http.StatusMethodNotAllowed)
1369                 }
1370         } else {
1371                 if req.Method == http.MethodGet {
1372                         msg := "none"
1373                         if log.IsLevelEnabled(log.PanicLevel) {
1374                                 msg = "panic"
1375                         } else if log.IsLevelEnabled(log.FatalLevel) {
1376                                 msg = "fatal"
1377                         } else if log.IsLevelEnabled(log.ErrorLevel) {
1378                                 msg = "error"
1379                         } else if log.IsLevelEnabled(log.WarnLevel) {
1380                                 msg = "warn"
1381                         } else if log.IsLevelEnabled(log.InfoLevel) {
1382                                 msg = "info"
1383                         } else if log.IsLevelEnabled(log.DebugLevel) {
1384                                 msg = "debug"
1385                         } else if log.IsLevelEnabled(log.TraceLevel) {
1386                                 msg = "trace"
1387                         }
1388                         w.Header().Set("Content-Type", "application/text")
1389                         w.Write([]byte(msg))
1390                 } else {
1391                         w.WriteHeader(http.StatusMethodNotAllowed)
1392                 }
1393         }
1394 }
1395
1396 func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
1397
1398         log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
1399
1400         topic_ok := false
1401         var c *kafka.Consumer = nil
1402         running := true
1403
1404         for topic_ok == false {
1405
1406                 select {
1407                 case reader_ctrl := <-control_ch:
1408                         if reader_ctrl.command == "EXIT" {
1409                                 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1410                                 //TODO: Stop consumer if present?
1411                                 data_ch <- nil //Signal to job handler
1412                                 running = false
1413                                 return
1414                         }
1415                 case <-time.After(1 * time.Second):
1416                         if !running {
1417                                 return
1418                         }
1419                         if c == nil {
1420                                 c = create_kafka_consumer(type_id, gid, cid)
1421                                 if c == nil {
1422                                         log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
1423                                 } else {
1424                                         log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
1425                                 }
1426                         }
1427                         if c != nil && topic_ok == false {
1428                                 err := c.SubscribeTopics([]string{topic}, nil)
1429                                 if err != nil {
1430                                         log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
1431                                 } else {
1432                                         log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
1433                                         topic_ok = true
1434                                 }
1435                         }
1436                 }
1437         }
1438         log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
1439
1440         var event_chan = make(chan int)
1441         go func() {
1442                 for {
1443                         select {
1444                         case evt := <-c.Events():
1445                                 switch evt.(type) {
1446                                 case kafka.OAuthBearerTokenRefresh:
1447                                         log.Debug("New consumer token needed: ", evt)
1448                                         token, err := fetch_token()
1449                                         if err != nil {
1450                                                 log.Warning("Cannot cannot fetch token: ", err)
1451                                                 c.SetOAuthBearerTokenFailure(err.Error())
1452                                         } else {
1453                                                 setTokenError := c.SetOAuthBearerToken(*token)
1454                                                 if setTokenError != nil {
1455                                                         log.Warning("Cannot cannot set token: ", setTokenError)
1456                                                         c.SetOAuthBearerTokenFailure(setTokenError.Error())
1457                                                 }
1458                                         }
1459                                 default:
1460                                         //TODO: Handle these?
1461                                         log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
1462                                 }
1463
1464                         case msg := <-event_chan:
1465                                 if msg == 0 {
1466                                         return
1467                                 }
1468                         case <-time.After(1 * time.Second):
1469                                 if !running {
1470                                         return
1471                                 }
1472                         }
1473                 }
1474         }()
1475
1476         go func() {
1477                 for {
1478                         //maxDur := 1 * time.Second
1479                         for {
1480                                 select {
1481                                 case reader_ctrl := <-control_ch:
1482                                         if reader_ctrl.command == "EXIT" {
1483                                                 event_chan <- 0
1484                                                 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1485                                                 data_ch <- nil //Signal to job handler
1486                                                 defer c.Close()
1487                                                 return
1488                                         }
1489                                 default:
1490
1491                                         ev := c.Poll(1000)
1492                                         if ev == nil {
1493                                                 log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
1494                                                 continue
1495                                         }
1496                                         switch e := ev.(type) {
1497                                         case *kafka.Message:
1498                                                 var kmsg KafkaPayload
1499                                                 kmsg.msg = e
1500
1501                                                 c.Commit() //TODO: Ok?
1502
1503                                                 //TODO: Check for exception
1504                                                 data_ch <- &kmsg
1505                                                 stats.in_msg_cnt++
1506                                                 log.Debug("Reader msg: ", &kmsg)
1507                                                 log.Debug("Reader - data_ch ", data_ch)
1508                                         case kafka.Error:
1509                                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
1510
1511                                         case kafka.OAuthBearerTokenRefresh:
1512                                                 log.Debug("New consumer token needed: ", ev)
1513                                                 token, err := fetch_token()
1514                                                 if err != nil {
1515                                                         log.Warning("Cannot cannot fetch token: ", err)
1516                                                         c.SetOAuthBearerTokenFailure(err.Error())
1517                                                 } else {
1518                                                         setTokenError := c.SetOAuthBearerToken(*token)
1519                                                         if setTokenError != nil {
1520                                                                 log.Warning("Cannot cannot set token: ", setTokenError)
1521                                                                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1522                                                         }
1523                                                 }
1524                                         default:
1525                                                 fmt.Printf("Ignored %v\n", e)
1526                                         }
1527
1528                                         // orig code
1529                                         // msg, err := c.ReadMessage(maxDur)
1530                                         // if err == nil {
1531                                         //      var kmsg KafkaPayload
1532                                         //      kmsg.msg = msg
1533
1534                                         //      c.Commit() //TODO: Ok?
1535
1536                                         //      //TODO: Check for exception
1537                                         //      data_ch <- &kmsg
1538                                         //      stats.in_msg_cnt++
1539                                         //      log.Debug("Reader msg: ", &kmsg)
1540                                         //      log.Debug("Reader - data_ch ", data_ch)
1541                                         // } else {
1542                                         //      log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic, ", reason: ", err)
1543                                         // }
1544
1545                                 }
1546                         }
1547                 }
1548         }()
1549 }
1550
1551 func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
1552
1553         var kafka_producer *kafka.Producer
1554
1555         running := true
1556         log.Info("Topic writer starting")
1557
1558         // Wait for kafka producer to become available - and be prepared to exit the writer
1559         for kafka_producer == nil {
1560                 select {
1561                 case writer_ctl := <-control_ch:
1562                         if writer_ctl.command == "EXIT" {
1563                                 //ignore cmd
1564                         }
1565                 default:
1566                         kafka_producer = start_producer()
1567                         if kafka_producer == nil {
1568                                 log.Debug("Could not start kafka producer - retrying")
1569                                 time.Sleep(1 * time.Second)
1570                         } else {
1571                                 log.Debug("Kafka producer started")
1572                                 //defer kafka_producer.Close()
1573                         }
1574                 }
1575         }
1576
1577         var event_chan = make(chan int)
1578         go func() {
1579                 for {
1580                         select {
1581                         case evt := <-kafka_producer.Events():
1582                                 //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend?
1583                                 switch evt.(type) {
1584                                 case *kafka.Message:
1585                                         m := evt.(*kafka.Message)
1586
1587                                         if m.TopicPartition.Error != nil {
1588                                                 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
1589                                         } else {
1590                                                 log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
1591                                         }
1592                                 case kafka.Error:
1593                                         log.Debug("Dumping topic writer event, error: ", evt)
1594                                 case kafka.OAuthBearerTokenRefresh:
1595                                         log.Debug("New producer token needed: ", evt)
1596                                         token, err := fetch_token()
1597                                         if err != nil {
1598                                                 log.Warning("Cannot cannot fetch token: ", err)
1599                                                 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
1600                                         } else {
1601                                                 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
1602                                                 if setTokenError != nil {
1603                                                         log.Warning("Cannot cannot set token: ", setTokenError)
1604                                                         kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
1605                                                 }
1606                                         }
1607                                 default:
1608                                         log.Debug("Dumping topic writer event, unknown: ", evt)
1609                                 }
1610
1611                         case msg := <-event_chan:
1612                                 if msg == 0 {
1613                                         return
1614                                 }
1615                         case <-time.After(1 * time.Second):
1616                                 if !running {
1617                                         return
1618                                 }
1619                         }
1620                 }
1621         }()
1622         go func() {
1623                 for {
1624                         select {
1625                         case writer_ctl := <-control_ch:
1626                                 if writer_ctl.command == "EXIT" {
1627                                         // ignore - wait for channel signal
1628                                 }
1629
1630                         case kmsg := <-data_ch:
1631                                 if kmsg == nil {
1632                                         event_chan <- 0
1633                                         // TODO: Close producer?
1634                                         log.Info("Topic writer stopped by channel signal - start_topic_writer")
1635                                         defer kafka_producer.Close()
1636                                         return
1637                                 }
1638
1639                                 retries := 10
1640                                 msg_ok := false
1641                                 var err error
1642                                 for retry := 1; retry <= retries && msg_ok == false; retry++ {
1643                                         err = kafka_producer.Produce(&kafka.Message{
1644                                                 TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
1645                                                 Value:          kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
1646
1647                                         if err == nil {
1648                                                 incr_out_msg_cnt(kmsg.jobid)
1649                                                 msg_ok = true
1650                                                 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
1651                                         } else {
1652                                                 log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
1653                                                 time.Sleep(time.Duration(retry) * time.Second)
1654                                         }
1655                                 }
1656                                 if !msg_ok {
1657                                         //TODO: Retry sending msg?
1658                                         log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
1659                                 }
1660                         case <-time.After(1000 * time.Millisecond):
1661                                 if !running {
1662                                         return
1663                                 }
1664                         }
1665                 }
1666         }()
1667 }
1668
1669 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
1670         var cm kafka.ConfigMap
1671         if creds_grant_type == "" {
1672                 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1673                 cm = kafka.ConfigMap{
1674                         "bootstrap.servers":  bootstrapserver,
1675                         "group.id":           gid,
1676                         "client.id":          cid,
1677                         "auto.offset.reset":  "latest",
1678                         "enable.auto.commit": false,
1679                         //"auto.commit.interval.ms": 5000,
1680                 }
1681         } else {
1682                 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1683                 cm = kafka.ConfigMap{
1684                         "bootstrap.servers":  bootstrapserver,
1685                         "group.id":           gid,
1686                         "client.id":          cid,
1687                         "auto.offset.reset":  "latest",
1688                         "enable.auto.commit": false,
1689                         "sasl.mechanism":     "OAUTHBEARER",
1690                         "security.protocol":  "SASL_PLAINTEXT",
1691                 }
1692         }
1693         c, err := kafka.NewConsumer(&cm)
1694
1695         //TODO: How to handle autocommit or commit message by message
1696         //TODO: Make arg to kafka configurable
1697
1698         if err != nil {
1699                 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
1700                 return nil
1701         }
1702
1703         //c.Commit()
1704         log.Info("Created kafka consumer for type: ", type_id, " OK")
1705         return c
1706 }
1707
1708 // Start kafka producer
1709 func start_producer() *kafka.Producer {
1710         log.Info("Creating kafka producer")
1711
1712         var cm kafka.ConfigMap
1713         if creds_grant_type == "" {
1714                 log.Info("Creating kafka SASL plain text producer")
1715                 cm = kafka.ConfigMap{
1716                         "bootstrap.servers": bootstrapserver,
1717                 }
1718         } else {
1719                 log.Info("Creating kafka SASL plain text producer")
1720                 cm = kafka.ConfigMap{
1721                         "bootstrap.servers": bootstrapserver,
1722                         "sasl.mechanism":    "OAUTHBEARER",
1723                         "security.protocol": "SASL_PLAINTEXT",
1724                 }
1725         }
1726
1727         p, err := kafka.NewProducer(&cm)
1728         if err != nil {
1729                 log.Error("Cannot create kafka producer,", err)
1730                 return nil
1731         }
1732         return p
1733 }
1734
1735 func start_adminclient() *kafka.AdminClient {
1736         log.Info("Creating kafka admin client")
1737         a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
1738         if err != nil {
1739                 log.Error("Cannot create kafka admin client,", err)
1740                 return nil
1741         }
1742         return a
1743 }
1744
1745 func create_minio_client(id string) (*minio.Client, *error) {
1746         log.Debug("Get minio client")
1747         minio_client, err := minio.New(filestore_server, &minio.Options{
1748                 Secure: false,
1749                 Creds:  credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
1750         })
1751         if err != nil {
1752                 log.Error("Cannot create minio client, ", err)
1753                 return nil, &err
1754         }
1755         return minio_client, nil
1756 }
1757
1758 func incr_out_msg_cnt(jobid string) {
1759         j, ok := InfoJobs[jobid]
1760         if ok {
1761                 j.statistics.out_msg_cnt++
1762         }
1763 }
1764
1765 func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
1766
1767         log.Info("Type job", type_id, " started")
1768
1769         filters := make(map[string]Filter)
1770         topic_list := make(map[string]string)
1771         var mc *minio.Client
1772         const mc_id = "mc_" + "start_job_xml_file_data"
1773         running := true
1774         for {
1775                 select {
1776                 case job_ctl := <-control_ch:
1777                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1778                         switch job_ctl.command {
1779                         case "EXIT":
1780                                 //ignore cmd - handled by channel signal
1781                         case "ADD-FILTER":
1782                                 filters[job_ctl.filter.JobId] = job_ctl.filter
1783                                 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1784
1785                                 tmp_topic_list := make(map[string]string)
1786                                 for k, v := range topic_list {
1787                                         tmp_topic_list[k] = v
1788                                 }
1789                                 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1790                                 topic_list = tmp_topic_list
1791                         case "REMOVE-FILTER":
1792                                 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1793
1794                                 tmp_topic_list := make(map[string]string)
1795                                 for k, v := range topic_list {
1796                                         tmp_topic_list[k] = v
1797                                 }
1798                                 delete(tmp_topic_list, job_ctl.filter.JobId)
1799                                 topic_list = tmp_topic_list
1800                         }
1801
1802                 case msg := <-data_in_ch:
1803                         if msg == nil {
1804                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
1805
1806                                 running = false
1807                                 return
1808                         }
1809                         if fsbucket != "" && fvolume == "" {
1810                                 if mc == nil {
1811                                         var err *error
1812                                         mc, err = create_minio_client(mc_id)
1813                                         if err != nil {
1814                                                 log.Debug("Cannot create minio client for type job: ", type_id)
1815                                         }
1816                                 }
1817                         }
1818                         //TODO: Sort processed file conversions in order (FIFO)
1819                         jobLimiterChan <- struct{}{}
1820                         go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
1821
1822                 case <-time.After(1 * time.Second):
1823                         if !running {
1824                                 return
1825                         }
1826                 }
1827         }
1828         //}()
1829 }
1830
1831 func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
1832         defer func() {
1833                 <-jobLimiterChan
1834         }()
1835         PrintMemUsage()
1836
1837         if fvolume == "" && fsbucket == "" {
1838                 log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
1839                 return
1840         } else if (fvolume != "") && (fsbucket != "") {
1841                 log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
1842                 return
1843         }
1844
1845         start := time.Now()
1846         var evt_data XmlFileEventHeader
1847
1848         err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
1849         if err != nil {
1850                 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
1851                 return
1852         }
1853         log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
1854
1855         var reader io.Reader
1856
1857         //TODO -> config
1858         INPUTBUCKET := "ropfiles"
1859
1860         filename := ""
1861         if fvolume != "" {
1862                 filename = fvolume + "/" + evt_data.Name
1863                 fi, err := os.Open(filename)
1864
1865                 if err != nil {
1866                         log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
1867                         return
1868                 }
1869                 defer fi.Close()
1870                 reader = fi
1871                 //} else if evt_data.ObjectStoreBucket != "" {
1872         } else {
1873                 filename = evt_data.Name
1874                 if mc != nil {
1875                         tctx := context.Background()
1876                         mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
1877                         if err != nil {
1878                                 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
1879                                 return
1880                         }
1881                         if mr == nil {
1882                                 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader -  discarding message, error details: ", err)
1883                                 return
1884                         }
1885                         reader = mr
1886                         defer mr.Close()
1887                 } else {
1888                         log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client -  discarding message")
1889                         return
1890                 }
1891         }
1892
1893         if reader == nil {
1894                 log.Error("Cannot get: ", filename, " - null reader")
1895                 return
1896         }
1897         var file_bytes []byte
1898         if strings.HasSuffix(filename, "gz") {
1899                 start := time.Now()
1900                 var buf3 bytes.Buffer
1901                 errb := gunzipReaderToWriter(&buf3, reader)
1902                 if errb != nil {
1903                         log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
1904                         return
1905                 }
1906                 file_bytes = buf3.Bytes()
1907                 log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
1908
1909         } else {
1910                 var buf3 bytes.Buffer
1911                 _, err2 := io.Copy(&buf3, reader)
1912                 if err2 != nil {
1913                         log.Error("File ", filename, " - cannot be read, discarding message, ", err)
1914                         return
1915                 }
1916                 file_bytes = buf3.Bytes()
1917         }
1918         start = time.Now()
1919         b, err := xml_to_json_conv(&file_bytes, &evt_data)
1920         if err != nil {
1921                 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
1922                 return
1923         }
1924         log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
1925
1926         new_fn := evt_data.Name + os.Getenv("KP") + ".json"
1927         if outputCompression == "gz" {
1928                 new_fn = new_fn + ".gz"
1929                 start = time.Now()
1930                 var buf bytes.Buffer
1931                 err = gzipWrite(&buf, &b)
1932                 if err != nil {
1933                         log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
1934                         return
1935                 }
1936                 b = buf.Bytes()
1937                 log.Debug("Gzip file:  ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
1938
1939         }
1940         start = time.Now()
1941
1942         if fvolume != "" {
1943                 //Store on disk
1944                 err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
1945                 if err != nil {
1946                         log.Error("Cannot write file ", new_fn, " - discarding message,", err)
1947                         return
1948                 }
1949                 log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
1950         } else if fsbucket != "" {
1951                 // Store in minio
1952                 objectName := new_fn
1953                 if mc != nil {
1954
1955                         contentType := "application/json"
1956                         if strings.HasSuffix(objectName, ".gz") {
1957                                 contentType = "application/gzip"
1958                         }
1959
1960                         // Upload the xml file with PutObject
1961                         r := bytes.NewReader(b)
1962                         tctx := context.Background()
1963                         if check_minio_bucket(mc, mc_id, fsbucket) == false {
1964                                 err := create_minio_bucket(mc, mc_id, fsbucket)
1965                                 if err != nil {
1966                                         log.Error("Cannot create bucket: ", fsbucket, ", ", err)
1967                                         return
1968                                 }
1969                         }
1970                         ok := false
1971                         for i := 1; i < 64 && ok == false; i = i * 2 {
1972                                 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
1973                                 if err != nil {
1974
1975                                         if i == 1 {
1976                                                 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
1977                                         } else {
1978                                                 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
1979                                         }
1980                                         time.Sleep(time.Duration(i) * time.Second)
1981                                 } else {
1982                                         log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
1983                                         log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
1984                                         ok = true
1985                                 }
1986                         }
1987                         if !ok {
1988                                 log.Error("Cannot upload : ", objectName, ", ", err)
1989                         }
1990                 } else {
1991                         log.Error("Cannot upload: ", objectName, ", no client")
1992                 }
1993         }
1994
1995         start = time.Now()
1996         if fvolume == "" {
1997                 var fde FileDownloadedEvt
1998                 fde.Filename = new_fn
1999                 j, err := jsoniter.Marshal(fde)
2000
2001                 if err != nil {
2002                         log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
2003                         return
2004                 }
2005                 msg.msg.Value = j
2006         } else {
2007                 var fde FileDownloadedEvt
2008                 fde.Filename = new_fn
2009                 j, err := jsoniter.Marshal(fde)
2010
2011                 if err != nil {
2012                         log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
2013                         return
2014                 }
2015                 msg.msg.Value = j
2016         }
2017         msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
2018         log.Debug("Marshal file-collect event ", time.Since(start).String())
2019
2020         for k, v := range topic_list {
2021                 var kmsg *KafkaPayload = new(KafkaPayload)
2022                 kmsg.msg = msg.msg
2023                 kmsg.topic = v
2024                 kmsg.jobid = k
2025                 data_out_channel <- kmsg
2026         }
2027 }
2028
2029 func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
2030         var f MeasCollecFile
2031         start := time.Now()
2032         err := xml.Unmarshal(*f_byteValue, &f)
2033         if err != nil {
2034                 return nil, errors.New("Cannot unmarshal xml-file")
2035         }
2036         log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
2037
2038         start = time.Now()
2039         var pmfile PMJsonFile
2040
2041         //TODO: Fill in more values
2042         pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
2043         pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
2044         pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
2045         pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
2046         pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
2047
2048         for _, it := range f.MeasData.MeasInfo {
2049                 var mili MeasInfoList
2050                 mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
2051                 for _, jt := range it.MeasType {
2052                         mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
2053                 }
2054                 for _, jt := range it.MeasValue {
2055                         var mv MeasValues
2056                         mv.MeasObjInstID = jt.MeasObjLdn
2057                         mv.SuspectFlag = jt.Suspect
2058                         if jt.Suspect == "" {
2059                                 mv.SuspectFlag = "false"
2060                         }
2061                         for _, kt := range jt.R {
2062                                 ni, _ := strconv.Atoi(kt.P)
2063                                 nv := kt.Text
2064                                 mr := MeasResults{ni, nv}
2065                                 mv.MeasResultsList = append(mv.MeasResultsList, mr)
2066                         }
2067                         mili.MeasValuesList = append(mili.MeasValuesList, mv)
2068                 }
2069
2070                 pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
2071         }
2072
2073         pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
2074
2075         //TODO: Fill more values
2076         pmfile.Event.CommonEventHeader.Domain = ""    //xfeh.Domain
2077         pmfile.Event.CommonEventHeader.EventID = ""   //xfeh.EventID
2078         pmfile.Event.CommonEventHeader.Sequence = 0   //xfeh.Sequence
2079         pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
2080         pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
2081         pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
2082         pmfile.Event.CommonEventHeader.Priority = ""            //xfeh.Priority
2083         pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
2084         pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
2085         pmfile.Event.CommonEventHeader.Version = ""                 //xfeh.Version
2086         pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
2087         pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
2088
2089         log.Debug("Convert xml to json : ", time.Since(start).String())
2090
2091         start = time.Now()
2092         json, err := jsoniter.Marshal(pmfile)
2093         log.Debug("Marshal json : ", time.Since(start).String())
2094
2095         if err != nil {
2096                 return nil, errors.New("Cannot marshal converted json")
2097         }
2098         return json, nil
2099 }
2100
2101 func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
2102
2103         log.Info("Type job", type_id, " started")
2104
2105         filters := make(map[string]Filter)
2106         filterParams_list := make(map[string]FilterMaps)
2107         // ch_list := make(map[string]chan *KafkaPayload)
2108         topic_list := make(map[string]string)
2109         var mc *minio.Client
2110         const mc_id = "mc_" + "start_job_json_file_data"
2111         running := true
2112         for {
2113                 select {
2114                 case job_ctl := <-control_ch:
2115                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2116                         switch job_ctl.command {
2117                         case "EXIT":
2118                                 //ignore cmd - handled by channel signal
2119                         case "ADD-FILTER":
2120                                 //TODO: Refactor...
2121                                 filters[job_ctl.filter.JobId] = job_ctl.filter
2122                                 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2123
2124                                 tmp_filterParams_list := make(map[string]FilterMaps)
2125                                 for k, v := range filterParams_list {
2126                                         tmp_filterParams_list[k] = v
2127                                 }
2128                                 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2129                                 filterParams_list = tmp_filterParams_list
2130
2131                                 tmp_topic_list := make(map[string]string)
2132                                 for k, v := range topic_list {
2133                                         tmp_topic_list[k] = v
2134                                 }
2135                                 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
2136                                 topic_list = tmp_topic_list
2137                         case "REMOVE-FILTER":
2138                                 //TODO: Refactor...
2139                                 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2140
2141                                 tmp_filterParams_list := make(map[string]FilterMaps)
2142                                 for k, v := range filterParams_list {
2143                                         tmp_filterParams_list[k] = v
2144                                 }
2145                                 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2146                                 filterParams_list = tmp_filterParams_list
2147
2148                                 tmp_topic_list := make(map[string]string)
2149                                 for k, v := range topic_list {
2150                                         tmp_topic_list[k] = v
2151                                 }
2152                                 delete(tmp_topic_list, job_ctl.filter.JobId)
2153                                 topic_list = tmp_topic_list
2154                         }
2155
2156                 case msg := <-data_in_ch:
2157                         if msg == nil {
2158                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
2159
2160                                 running = false
2161                                 return
2162                         }
2163                         if objectstore {
2164                                 if mc == nil {
2165                                         var err *error
2166                                         mc, err = create_minio_client(mc_id)
2167                                         if err != nil {
2168                                                 log.Debug("Cannot create minio client for type job: ", type_id)
2169                                         }
2170                                 }
2171                         }
2172                         //TODO: Sort processed file conversions in order (FIFO)
2173                         jobLimiterChan <- struct{}{}
2174                         go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
2175
2176                 case <-time.After(1 * time.Second):
2177                         if !running {
2178                                 return
2179                         }
2180                 }
2181         }
2182 }
2183
2184 func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2185
2186         //Release job limit
2187         defer func() {
2188                 <-jobLimiterChan
2189         }()
2190
2191         PrintMemUsage()
2192
2193         var evt_data FileDownloadedEvt
2194         err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2195         if err != nil {
2196                 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2197                 return
2198         }
2199         log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2200
2201         var reader io.Reader
2202
2203         //TODO -> config
2204         //INPUTBUCKET := "json-file-ready"
2205         INPUTBUCKET := "pm-files-json"
2206         filename := ""
2207         if objectstore == false {
2208                 filename = files_volume + "/" + evt_data.Filename
2209                 fi, err := os.Open(filename)
2210
2211                 if err != nil {
2212                         log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2213                         return
2214                 }
2215                 defer fi.Close()
2216                 reader = fi
2217         } else {
2218                 filename = "/" + evt_data.Filename
2219                 if mc != nil {
2220                         if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2221                                 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2222                                 return
2223                         }
2224                         tctx := context.Background()
2225                         mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2226                         if err != nil {
2227                                 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2228                                 return
2229                         }
2230                         reader = mr
2231                         defer mr.Close()
2232                 } else {
2233                         log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2234                         return
2235                 }
2236         }
2237
2238         var data *[]byte
2239         if strings.HasSuffix(filename, "gz") {
2240                 start := time.Now()
2241                 var buf2 bytes.Buffer
2242                 errb := gunzipReaderToWriter(&buf2, reader)
2243                 if errb != nil {
2244                         log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2245                         return
2246                 }
2247                 d := buf2.Bytes()
2248                 data = &d
2249                 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2250         } else {
2251
2252                 start := time.Now()
2253                 d, err := io.ReadAll(reader)
2254                 if err != nil {
2255                         log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2256                         return
2257                 }
2258                 data = &d
2259
2260                 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2261         }
2262
2263         // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
2264         // var pmfile PMJsonFile
2265         // start := time.Now()
2266         // err = jsoniter.Unmarshal(*data, &pmfile)
2267         // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2268
2269         // if err != nil {
2270         //      log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2271         //      return
2272         // }
2273         for k, v := range filterList {
2274
2275                 var pmfile PMJsonFile
2276                 start := time.Now()
2277                 err = jsoniter.Unmarshal(*data, &pmfile)
2278                 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2279
2280                 if err != nil {
2281                         log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2282                         return
2283                 }
2284
2285                 var kmsg *KafkaPayload = new(KafkaPayload)
2286                 kmsg.msg = new(kafka.Message)
2287                 kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
2288                 log.Debug("topic:", topic_list[k])
2289                 log.Debug("sourceNameMap:", v.sourceNameMap)
2290                 log.Debug("measObjClassMap:", v.measObjClassMap)
2291                 log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
2292                 log.Debug("measTypesMap:", v.measTypesMap)
2293                 //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2294                 b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2295                 if b == nil {
2296                         log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2297                         return
2298                 }
2299                 kmsg.msg.Value = *b
2300                 //BMX}
2301
2302                 // if outputCompression == "json.gz" {
2303                 //      start := time.Now()
2304                 //      var buf bytes.Buffer
2305                 //      err := gzipWrite(&buf, &kmsg.msg.Value)
2306                 //      if err != nil {
2307                 //              log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err)
2308                 //              return
2309
2310                 //      }
2311                 //      kmsg.msg.Value = buf.Bytes()
2312                 //      log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String())
2313                 // }
2314                 kmsg.topic = topic_list[k]
2315                 kmsg.jobid = k
2316
2317                 data_out_channel <- kmsg
2318         }
2319
2320 }
2321
2322 func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2323
2324         if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
2325                 return nil
2326         }
2327         start := time.Now()
2328         j, err := jsoniter.Marshal(&data)
2329
2330         log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2331
2332         if err != nil {
2333                 log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2334                 return nil
2335         }
2336
2337         log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2338         return &j
2339 }
2340
2341 func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
2342         filter_req := true
2343         start := time.Now()
2344         if len(sourceNameMap) != 0 {
2345                 if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2346                         filter_req = false
2347                         return nil
2348                 }
2349         }
2350         if filter_req {
2351                 modified := false
2352                 var temp_mil []MeasInfoList
2353                 for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2354
2355                         check_cntr := false
2356                         var cnt_flags []bool
2357                         if len(measTypesMap) > 0 {
2358                                 c_cntr := 0
2359                                 var temp_mtl []string
2360                                 for _, v := range zz.MeasTypes.SMeasTypesList {
2361                                         if measTypesMap[v] {
2362                                                 cnt_flags = append(cnt_flags, true)
2363                                                 c_cntr++
2364                                                 temp_mtl = append(temp_mtl, v)
2365                                         } else {
2366                                                 cnt_flags = append(cnt_flags, false)
2367                                         }
2368                                 }
2369                                 if c_cntr > 0 {
2370                                         check_cntr = true
2371                                         zz.MeasTypes.SMeasTypesList = temp_mtl
2372                                 } else {
2373                                         modified = true
2374                                         continue
2375                                 }
2376                         }
2377                         keep := false
2378                         var temp_mvl []MeasValues
2379                         for _, yy := range zz.MeasValuesList {
2380                                 keep_class := false
2381                                 keep_inst := false
2382                                 keep_cntr := false
2383
2384                                 dna := strings.Split(yy.MeasObjInstID, ",")
2385                                 instName := dna[len(dna)-1]
2386                                 cls := strings.Split(dna[len(dna)-1], "=")[0]
2387
2388                                 if len(measObjClassMap) > 0 {
2389                                         if measObjClassMap[cls] {
2390                                                 keep_class = true
2391                                         }
2392                                 } else {
2393                                         keep_class = true
2394                                 }
2395
2396                                 if len(measObjInstIdsMap) > 0 {
2397                                         if measObjInstIdsMap[instName] {
2398                                                 keep_inst = true
2399                                         }
2400                                 } else {
2401                                         keep_inst = true
2402                                 }
2403
2404                                 if check_cntr {
2405                                         var temp_mrl []MeasResults
2406                                         cnt_p := 1
2407                                         for _, v := range yy.MeasResultsList {
2408                                                 if cnt_flags[v.P-1] {
2409                                                         v.P = cnt_p
2410                                                         cnt_p++
2411                                                         temp_mrl = append(temp_mrl, v)
2412                                                 }
2413                                         }
2414                                         yy.MeasResultsList = temp_mrl
2415                                         keep_cntr = true
2416                                 } else {
2417                                         keep_cntr = true
2418                                 }
2419                                 if keep_class && keep_cntr && keep_inst {
2420                                         keep = true
2421                                         temp_mvl = append(temp_mvl, yy)
2422                                 }
2423                         }
2424                         if keep {
2425                                 zz.MeasValuesList = temp_mvl
2426                                 temp_mil = append(temp_mil, zz)
2427                                 modified = true
2428                         }
2429
2430                 }
2431                 //Only if modified
2432                 if modified {
2433                         if len(temp_mil) == 0 {
2434                                 log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2435                                 return nil
2436                         }
2437                         data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2438                 }
2439         }
2440         log.Debug("Filter: ", time.Since(start).String())
2441         return data
2442 }
2443
2444 // func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2445
2446 //      filter_req := true
2447 //      start := time.Now()
2448 //      if len(sourceNameMap) != 0 {
2449 //              if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2450 //                      filter_req = false
2451 //                      return nil
2452 //              }
2453 //      }
2454 //      if filter_req {
2455 //              modified := false
2456 //              var temp_mil []MeasInfoList
2457 //              for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2458
2459 //                      check_cntr := false
2460 //                      var cnt_flags []bool
2461 //                      if len(measTypesMap) > 0 {
2462 //                              c_cntr := 0
2463 //                              var temp_mtl []string
2464 //                              for _, v := range zz.MeasTypes.SMeasTypesList {
2465 //                                      if measTypesMap[v] {
2466 //                                              cnt_flags = append(cnt_flags, true)
2467 //                                              c_cntr++
2468 //                                              temp_mtl = append(temp_mtl, v)
2469 //                                      } else {
2470 //                                              cnt_flags = append(cnt_flags, false)
2471 //                                      }
2472 //                              }
2473 //                              if c_cntr > 0 {
2474 //                                      check_cntr = true
2475 //                                      zz.MeasTypes.SMeasTypesList = temp_mtl
2476 //                              } else {
2477 //                                      modified = true
2478 //                                      continue
2479 //                              }
2480 //                      }
2481 //                      keep := false
2482 //                      var temp_mvl []MeasValues
2483 //                      for _, yy := range zz.MeasValuesList {
2484 //                              keep_class := false
2485 //                              keep_inst := false
2486 //                              keep_cntr := false
2487
2488 //                              dna := strings.Split(yy.MeasObjInstID, ",")
2489 //                              instName := dna[len(dna)-1]
2490 //                              cls := strings.Split(dna[len(dna)-1], "=")[0]
2491
2492 //                              if len(measObjClassMap) > 0 {
2493 //                                      if measObjClassMap[cls] {
2494 //                                              keep_class = true
2495 //                                      }
2496 //                              } else {
2497 //                                      keep_class = true
2498 //                              }
2499
2500 //                              if len(measObjInstIdsMap) > 0 {
2501 //                                      if measObjInstIdsMap[instName] {
2502 //                                              keep_inst = true
2503 //                                      }
2504 //                              } else {
2505 //                                      keep_inst = true
2506 //                              }
2507
2508 //                              if check_cntr {
2509 //                                      var temp_mrl []MeasResults
2510 //                                      cnt_p := 1
2511 //                                      for _, v := range yy.MeasResultsList {
2512 //                                              if cnt_flags[v.P-1] {
2513 //                                                      v.P = cnt_p
2514 //                                                      cnt_p++
2515 //                                                      temp_mrl = append(temp_mrl, v)
2516 //                                              }
2517 //                                      }
2518 //                                      yy.MeasResultsList = temp_mrl
2519 //                                      keep_cntr = true
2520 //                              } else {
2521 //                                      keep_cntr = true
2522 //                              }
2523 //                              if keep_class && keep_cntr && keep_inst {
2524 //                                      keep = true
2525 //                                      temp_mvl = append(temp_mvl, yy)
2526 //                              }
2527 //                      }
2528 //                      if keep {
2529 //                              zz.MeasValuesList = temp_mvl
2530 //                              temp_mil = append(temp_mil, zz)
2531 //                              modified = true
2532 //                      }
2533
2534 //              }
2535 //              //Only if modified
2536 //              if modified {
2537 //                      if len(temp_mil) == 0 {
2538 //                              log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2539 //                              return nil
2540 //                      }
2541 //                      data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2542 //              }
2543 //      }
2544 //      log.Debug("Filter: ", time.Since(start).String())
2545
2546 //      start = time.Now()
2547 //      j, err := jsoniter.Marshal(&data)
2548
2549 //      log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2550
2551 //      if err != nil {
2552 //              log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2553 //              return nil
2554 //      }
2555
2556 //      log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2557 //      return &j
2558 // }
2559
2560 func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
2561
2562         log.Info("Type job", type_id, " started")
2563         log.Debug("influx job ch ", data_in_ch)
2564         filters := make(map[string]Filter)
2565         filterParams_list := make(map[string]FilterMaps)
2566         influx_job_params := make(map[string]InfluxJobParameters)
2567         var mc *minio.Client
2568         const mc_id = "mc_" + "start_job_json_file_data_influx"
2569         running := true
2570         for {
2571                 select {
2572                 case job_ctl := <-control_ch:
2573                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2574                         switch job_ctl.command {
2575                         case "EXIT":
2576                                 //ignore cmd - handled by channel signal
2577                         case "ADD-FILTER":
2578                                 //TODO: Refactor...
2579                                 filters[job_ctl.filter.JobId] = job_ctl.filter
2580                                 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2581                                 log.Debug(job_ctl.filter)
2582                                 tmp_filterParams_list := make(map[string]FilterMaps)
2583                                 for k, v := range filterParams_list {
2584                                         tmp_filterParams_list[k] = v
2585                                 }
2586                                 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2587                                 filterParams_list = tmp_filterParams_list
2588
2589                                 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2590                                 for k, v := range influx_job_params {
2591                                         tmp_influx_job_params[k] = v
2592                                 }
2593                                 tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
2594                                 influx_job_params = tmp_influx_job_params
2595
2596                         case "REMOVE-FILTER":
2597                                 //TODO: Refactor...
2598                                 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2599
2600                                 tmp_filterParams_list := make(map[string]FilterMaps)
2601                                 for k, v := range filterParams_list {
2602                                         tmp_filterParams_list[k] = v
2603                                 }
2604                                 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2605                                 filterParams_list = tmp_filterParams_list
2606
2607                                 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2608                                 for k, v := range influx_job_params {
2609                                         tmp_influx_job_params[k] = v
2610                                 }
2611                                 delete(tmp_influx_job_params, job_ctl.filter.JobId)
2612                                 influx_job_params = tmp_influx_job_params
2613                         }
2614
2615                 case msg := <-data_in_ch:
2616                         log.Debug("Data reveived - influx")
2617                         if msg == nil {
2618                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
2619
2620                                 running = false
2621                                 return
2622                         }
2623                         if objectstore {
2624                                 if mc == nil {
2625                                         var err *error
2626                                         mc, err = create_minio_client(mc_id)
2627                                         if err != nil {
2628                                                 log.Debug("Cannot create minio client for type job: ", type_id)
2629                                         }
2630                                 }
2631                         }
2632                         //TODO: Sort processed file conversions in order (FIFO)
2633                         jobLimiterChan <- struct{}{}
2634                         go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
2635
2636                 case <-time.After(1 * time.Second):
2637                         if !running {
2638                                 return
2639                         }
2640                 }
2641         }
2642 }
2643
2644 func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2645
2646         log.Debug("run_json_file_data_job_influx")
2647         //Release job limit
2648         defer func() {
2649                 <-jobLimiterChan
2650         }()
2651
2652         PrintMemUsage()
2653
2654         var evt_data FileDownloadedEvt
2655         err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2656         if err != nil {
2657                 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2658                 return
2659         }
2660         log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2661
2662         var reader io.Reader
2663
2664         //TODO -> config
2665         //INPUTBUCKET := "json-file-ready"
2666         INPUTBUCKET := "pm-files-json"
2667         filename := ""
2668         if objectstore == false {
2669                 filename = files_volume + "/" + evt_data.Filename
2670                 fi, err := os.Open(filename)
2671
2672                 if err != nil {
2673                         log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2674                         return
2675                 }
2676                 defer fi.Close()
2677                 reader = fi
2678         } else {
2679                 filename = "/" + evt_data.Filename
2680                 if mc != nil {
2681                         if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2682                                 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2683                                 return
2684                         }
2685                         tctx := context.Background()
2686                         mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2687                         if err != nil {
2688                                 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2689                                 return
2690                         }
2691                         reader = mr
2692                         defer mr.Close()
2693                 } else {
2694                         log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2695                         return
2696                 }
2697         }
2698
2699         var data *[]byte
2700         if strings.HasSuffix(filename, "gz") {
2701                 start := time.Now()
2702                 var buf2 bytes.Buffer
2703                 errb := gunzipReaderToWriter(&buf2, reader)
2704                 if errb != nil {
2705                         log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2706                         return
2707                 }
2708                 d := buf2.Bytes()
2709                 data = &d
2710                 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2711         } else {
2712
2713                 start := time.Now()
2714                 d, err := io.ReadAll(reader)
2715                 if err != nil {
2716                         log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2717                         return
2718                 }
2719                 data = &d
2720
2721                 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2722         }
2723         // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
2724         // var pmfile PMJsonFile
2725         // start := time.Now()
2726         // err = jsoniter.Unmarshal(*data, &pmfile)
2727         // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2728
2729         // if err != nil {
2730         //      log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2731         //      return
2732         // }
2733         for k, v := range filterList {
2734
2735                 var pmfile PMJsonFile
2736                 start := time.Now()
2737                 err = jsoniter.Unmarshal(*data, &pmfile)
2738                 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2739
2740                 if err != nil {
2741                         log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2742                         return
2743                 }
2744
2745                 if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2746                         b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2747                         if b == nil {
2748                                 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2749                                 return
2750                         }
2751
2752                 }
2753                 fluxParms := influxList[k]
2754                 log.Debug("Influxdb params: ", fluxParms)
2755                 client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
2756                 writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
2757
2758                 // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2759                 // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond)
2760                 // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond)
2761                 // timeT := time.Unix(tUnix, tUnixNanoRemainder)
2762                 // fmt.Println(timeT)
2763                 // fmt.Println("======================")
2764                 for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2765                         ctr_names := make(map[string]string)
2766                         for cni, cn := range zz.MeasTypes.SMeasTypesList {
2767                                 ctr_names[string(cni+1)] = cn
2768                         }
2769                         for _, xx := range zz.MeasValuesList {
2770                                 log.Debug("Measurement: ", xx.MeasObjInstID)
2771                                 log.Debug("Suspect flag: ", xx.SuspectFlag)
2772                                 p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
2773                                 p.AddField("suspectflag", xx.SuspectFlag)
2774                                 p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
2775                                 p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
2776                                 for _, yy := range xx.MeasResultsList {
2777                                         pi := string(yy.P)
2778                                         pv := yy.SValue
2779                                         pn := ctr_names[pi]
2780                                         log.Debug("Counter: ", pn, " Value: ", pv)
2781                                         pv_i, err := strconv.Atoi(pv)
2782                                         if err == nil {
2783                                                 p.AddField(pn, pv_i)
2784                                         } else {
2785                                                 p.AddField(pn, pv)
2786                                         }
2787                                 }
2788                                 //p.SetTime(timeT)
2789                                 log.Debug("StartEpochMicrosec from common event header:  ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2790                                 log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2791                                 p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2792                                 err := writeAPI.WritePoint(context.Background(), p)
2793                                 if err != nil {
2794                                         log.Error("Db write error: ", err)
2795                                 }
2796                         }
2797
2798                 }
2799                 client.Close()
2800         }
2801
2802 }