DFC shall provide a bearer authorization token in HTTP
[nonrtric/plt/ranpm.git] / pm-file-converter / main.go
1 //  ============LICENSE_START===============================================
2 //  Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 //  ========================================================================
4 //  Licensed under the Apache License, Version 2.0 (the "License");
5 //  you may not use this file except in compliance with the License.
6 //  You may obtain a copy of the License at
7 //
8 //       http://www.apache.org/licenses/LICENSE-2.0
9 //
10 //  Unless required by applicable law or agreed to in writing, software
11 //  distributed under the License is distributed on an "AS IS" BASIS,
12 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 //  See the License for the specific language governing permissions and
14 //  limitations under the License.
15 //  ============LICENSE_END=================================================
16 //
17
18 package main
19
20 import (
21         "bytes"
22         "compress/gzip"
23         "context"
24         "crypto/tls"
25         "encoding/json"
26         "encoding/xml"
27         "errors"
28         "fmt"
29         "io"
30         "net"
31         "os/signal"
32         "reflect"
33         "strings"
34         "sync"
35         "syscall"
36
37         "net/http"
38         "os"
39         "runtime"
40         "strconv"
41         "time"
42
43         "github.com/google/uuid"
44         "golang.org/x/oauth2/clientcredentials"
45
46         log "github.com/sirupsen/logrus"
47
48         "github.com/gorilla/mux"
49
50         "net/http/pprof"
51
52         "github.com/confluentinc/confluent-kafka-go/kafka"
53         influxdb2 "github.com/influxdata/influxdb-client-go/v2"
54         jsoniter "github.com/json-iterator/go"
55         "github.com/minio/minio-go/v7"
56         "github.com/minio/minio-go/v7/pkg/credentials"
57 )
58
59 //== Constants ==//
60
61 const http_port = 80
62 const https_port = 443
63 const config_file = "application_configuration.json"
64 const server_crt = "server.crt"
65 const server_key = "server.key"
66
67 const producer_name = "kafka-producer"
68
69 const registration_delay_short = 2
70 const registration_delay_long = 120
71
72 const mutexLocked = 1
73
74 const (
75         Init AppStates = iota
76         Running
77         Terminating
78 )
79
80 const reader_queue_length = 100 //Per type job
81 const writer_queue_length = 100 //Per info job
82 const parallelism_limiter = 100 //For all jobs
83
84 // This are optional - set if using SASL protocol is used towards kafka
85 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
86 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
87 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
88 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
89
90 //== Types ==//
91
92 type AppStates int64
93
94 type FilterParameters struct {
95         MeasuredEntityDns []string `json:"measuredEntityDns"`
96         MeasTypes         []string `json:"measTypes"`
97         MeasObjClass      []string `json:"measObjClass"`
98         MeasObjInstIds    []string `json:"measObjInstIds"`
99 }
100
101 type InfoJobDataType struct {
102         InfoJobData struct {
103                 KafkaOutputTopic string `json:"kafkaOutputTopic"`
104
105                 DbUrl    string `json:"db-url"`
106                 DbOrg    string `json:"db-org"`
107                 DbBucket string `json:"db-bucket"`
108                 DbToken  string `json:"db-token"`
109
110                 FilterParams FilterParameters `json:"filter"`
111         } `json:"info_job_data"`
112         InfoJobIdentity  string `json:"info_job_identity"`
113         InfoTypeIdentity string `json:"info_type_identity"`
114         LastUpdated      string `json:"last_updated"`
115         Owner            string `json:"owner"`
116         TargetURI        string `json:"target_uri"`
117 }
118
119 // Type for an infojob
120 type InfoJobRecord struct {
121         job_info     InfoJobDataType
122         output_topic string
123
124         statistics *InfoJobStats
125 }
126
127 // Type for an infojob
128 type TypeJobRecord struct {
129         InfoType        string
130         InputTopic      string
131         data_in_channel chan *KafkaPayload
132         reader_control  chan ReaderControl
133         job_control     chan JobControl
134         groupId         string
135         clientId        string
136
137         statistics *TypeJobStats
138 }
139
140 // Type for controlling the topic reader
141 type ReaderControl struct {
142         command string
143 }
144
145 // Type for controlling the topic writer
146 type WriterControl struct {
147         command string
148 }
149
150 // Type for controlling the job
151 type JobControl struct {
152         command string
153         filter  Filter
154 }
155
156 type KafkaPayload struct {
157         msg   *kafka.Message
158         topic string
159         jobid string
160 }
161
162 type FilterMaps struct {
163         sourceNameMap     map[string]bool
164         measObjClassMap   map[string]bool
165         measObjInstIdsMap map[string]bool
166         measTypesMap      map[string]bool
167 }
168
169 type InfluxJobParameters struct {
170         DbUrl    string
171         DbOrg    string
172         DbBucket string
173         DbToken  string
174 }
175
176 type Filter struct {
177         JobId       string
178         OutputTopic string
179         filter      FilterMaps
180
181         influxParameters InfluxJobParameters
182 }
183
184 // Type for info job statistics
185 type InfoJobStats struct {
186         out_msg_cnt  int
187         out_data_vol int64
188 }
189
190 // Type for type job statistics
191 type TypeJobStats struct {
192         in_msg_cnt  int
193         in_data_vol int64
194 }
195
196 // == API Datatypes ==//
197 // Type for supported data types
198 type DataType struct {
199         ID                 string `json:"id"`
200         KafkaInputTopic    string `json:"kafkaInputTopic"`
201         InputJobType       string `json:inputJobType`
202         InputJobDefinition struct {
203                 KafkaOutputTopic string `json:kafkaOutputTopic`
204         } `json:inputJobDefinition`
205
206         ext_job         *[]byte
207         ext_job_created bool
208         ext_job_id      string
209 }
210
211 type DataTypes struct {
212         ProdDataTypes []DataType `json:"types"`
213 }
214
215 type Minio_buckets struct {
216         Buckets map[string]bool
217 }
218
219 //== External data types ==//
220
221 // // Data type for event xml file download
222 type XmlFileEventHeader struct {
223         ProductName        string `json:"productName"`
224         VendorName         string `json:"vendorName"`
225         Location           string `json:"location"`
226         Compression        string `json:"compression"`
227         SourceName         string `json:"sourceName"`
228         FileFormatType     string `json:"fileFormatType"`
229         FileFormatVersion  string `json:"fileFormatVersion"`
230         StartEpochMicrosec int64  `json:"startEpochMicrosec"`
231         LastEpochMicrosec  int64  `json:"lastEpochMicrosec"`
232         Name               string `json:"name"`
233         ChangeIdentifier   string `json:"changeIdentifier"`
234         InternalLocation   string `json:"internalLocation"`
235         TimeZoneOffset     string `json:"timeZoneOffset"`
236         //ObjectStoreBucket  string `json:"objectStoreBucket"`
237 }
238
239 // Data types for input xml file
240 type MeasCollecFile struct {
241         XMLName        xml.Name `xml:"measCollecFile"`
242         Text           string   `xml:",chardata"`
243         Xmlns          string   `xml:"xmlns,attr"`
244         Xsi            string   `xml:"xsi,attr"`
245         SchemaLocation string   `xml:"schemaLocation,attr"`
246         FileHeader     struct {
247                 Text              string `xml:",chardata"`
248                 FileFormatVersion string `xml:"fileFormatVersion,attr"`
249                 VendorName        string `xml:"vendorName,attr"`
250                 DnPrefix          string `xml:"dnPrefix,attr"`
251                 FileSender        struct {
252                         Text        string `xml:",chardata"`
253                         LocalDn     string `xml:"localDn,attr"`
254                         ElementType string `xml:"elementType,attr"`
255                 } `xml:"fileSender"`
256                 MeasCollec struct {
257                         Text      string `xml:",chardata"`
258                         BeginTime string `xml:"beginTime,attr"`
259                 } `xml:"measCollec"`
260         } `xml:"fileHeader"`
261         MeasData struct {
262                 Text           string `xml:",chardata"`
263                 ManagedElement struct {
264                         Text      string `xml:",chardata"`
265                         LocalDn   string `xml:"localDn,attr"`
266                         SwVersion string `xml:"swVersion,attr"`
267                 } `xml:"managedElement"`
268                 MeasInfo []struct {
269                         Text       string `xml:",chardata"`
270                         MeasInfoId string `xml:"measInfoId,attr"`
271                         Job        struct {
272                                 Text  string `xml:",chardata"`
273                                 JobId string `xml:"jobId,attr"`
274                         } `xml:"job"`
275                         GranPeriod struct {
276                                 Text     string `xml:",chardata"`
277                                 Duration string `xml:"duration,attr"`
278                                 EndTime  string `xml:"endTime,attr"`
279                         } `xml:"granPeriod"`
280                         RepPeriod struct {
281                                 Text     string `xml:",chardata"`
282                                 Duration string `xml:"duration,attr"`
283                         } `xml:"repPeriod"`
284                         MeasType []struct {
285                                 Text string `xml:",chardata"`
286                                 P    string `xml:"p,attr"`
287                         } `xml:"measType"`
288                         MeasValue []struct {
289                                 Text       string `xml:",chardata"`
290                                 MeasObjLdn string `xml:"measObjLdn,attr"`
291                                 R          []struct {
292                                         Text string `xml:",chardata"`
293                                         P    string `xml:"p,attr"`
294                                 } `xml:"r"`
295                                 Suspect string `xml:"suspect"`
296                         } `xml:"measValue"`
297                 } `xml:"measInfo"`
298         } `xml:"measData"`
299         FileFooter struct {
300                 Text       string `xml:",chardata"`
301                 MeasCollec struct {
302                         Text    string `xml:",chardata"`
303                         EndTime string `xml:"endTime,attr"`
304                 } `xml:"measCollec"`
305         } `xml:"fileFooter"`
306 }
307
308 // Data type for json file
309 // Splitted in sevreal part to allow add/remove in lists
310 type MeasResults struct {
311         P      int    `json:"p"`
312         SValue string `json:"sValue"`
313 }
314
315 type MeasValues struct {
316         MeasObjInstID   string        `json:"measObjInstId"`
317         SuspectFlag     string        `json:"suspectFlag"`
318         MeasResultsList []MeasResults `json:"measResults"`
319 }
320
321 type SMeasTypes struct {
322         SMeasType string `json:"sMeasTypesList"`
323 }
324
325 type MeasInfoList struct {
326         MeasInfoID struct {
327                 SMeasInfoID string `json:"sMeasInfoId"`
328         } `json:"measInfoId"`
329         MeasTypes struct {
330                 SMeasTypesList []string `json:"sMeasTypesList"`
331         } `json:"measTypes"`
332         MeasValuesList []MeasValues `json:"measValuesList"`
333 }
334
335 type PMJsonFile struct {
336         Event struct {
337                 CommonEventHeader struct {
338                         Domain                  string `json:"domain"`
339                         EventID                 string `json:"eventId"`
340                         Sequence                int    `json:"sequence"`
341                         EventName               string `json:"eventName"`
342                         SourceName              string `json:"sourceName"`
343                         ReportingEntityName     string `json:"reportingEntityName"`
344                         Priority                string `json:"priority"`
345                         StartEpochMicrosec      int64  `json:"startEpochMicrosec"`
346                         LastEpochMicrosec       int64  `json:"lastEpochMicrosec"`
347                         Version                 string `json:"version"`
348                         VesEventListenerVersion string `json:"vesEventListenerVersion"`
349                         TimeZoneOffset          string `json:"timeZoneOffset"`
350                 } `json:"commonEventHeader"`
351                 Perf3GppFields struct {
352                         Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
353                         MeasDataCollection    struct {
354                                 GranularityPeriod             int            `json:"granularityPeriod"`
355                                 MeasuredEntityUserName        string         `json:"measuredEntityUserName"`
356                                 MeasuredEntityDn              string         `json:"measuredEntityDn"`
357                                 MeasuredEntitySoftwareVersion string         `json:"measuredEntitySoftwareVersion"`
358                                 SMeasInfoList                 []MeasInfoList `json:"measInfoList"`
359                         } `json:"measDataCollection"`
360                 } `json:"perf3gppFields"`
361         } `json:"event"`
362 }
363
364 // Data type for converted json file message
365 type FileDownloadedEvt struct {
366         Filename string `json:"filename"`
367 }
368
369 //== Variables ==//
370
371 var AppState = Init
372
373 // Lock for all internal data
374 var datalock sync.Mutex
375
376 var producer_instance_name string = producer_name
377
378 // Keep all info type jobs, key == type id
379 var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
380
381 // Keep all info jobs, key == job id
382 var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
383
384 var InfoTypes DataTypes
385
386 // Limiter - valid for all jobs
387 var jobLimiterChan = make(chan struct{}, parallelism_limiter)
388
389 // TODO: Config param?
390 var bucket_location = "swe"
391
392 var httpclient = &http.Client{}
393
394 // == Env variables ==//
395 var bootstrapserver = os.Getenv("KAFKA_SERVER")
396 var files_volume = os.Getenv("FILES_VOLUME")
397 var ics_server = os.Getenv("ICS")
398 var self = os.Getenv("SELF")
399 var filestore_user = os.Getenv("FILESTORE_USER")
400 var filestore_pwd = os.Getenv("FILESTORE_PWD")
401 var filestore_server = os.Getenv("FILESTORE_SERVER")
402
403 var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
404 var writer_control = make(chan WriterControl, 1)
405
406 var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
407
408 // == Main ==//
409 func main() {
410
411         //log.SetLevel(log.InfoLevel)
412         log.SetLevel(log.TraceLevel)
413
414         log.Info("Server starting...")
415
416         if self == "" {
417                 log.Panic("Env SELF not configured")
418         }
419         if bootstrapserver == "" {
420                 log.Panic("Env KAFKA_SERVER not set")
421         }
422         if ics_server == "" {
423                 log.Panic("Env ICS not set")
424         }
425         if os.Getenv("KP") != "" {
426                 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
427         }
428
429         rtr := mux.NewRouter()
430         rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
431         rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
432         rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
433         rtr.HandleFunc("/statistics", statistics)
434         rtr.HandleFunc("/logging/{level}", logging_level)
435         rtr.HandleFunc("/logging", logging_level)
436         rtr.HandleFunc("/", alive)
437
438         //For perf/mem profiling
439         rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
440
441         http.Handle("/", rtr)
442
443         http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
444
445         cer, err := tls.LoadX509KeyPair(server_crt, server_key)
446         if err != nil {
447                 log.Error("Cannot load key and cert - ", err)
448                 return
449         }
450         config := &tls.Config{Certificates: []tls.Certificate{cer}}
451         https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
452
453         // Run http
454         go func() {
455                 log.Info("Starting http service...")
456                 err := http_server.ListenAndServe()
457                 if err == http.ErrServerClosed { // graceful shutdown
458                         log.Info("http server shutdown...")
459                 } else if err != nil {
460                         log.Error("http server error: ", err)
461                 }
462         }()
463
464         //  Run https
465         go func() {
466                 log.Info("Starting https service...")
467                 err := https_server.ListenAndServe()
468                 if err == http.ErrServerClosed { // graceful shutdown
469                         log.Info("https server shutdown...")
470                 } else if err != nil {
471                         log.Error("https server error: ", err)
472                 }
473         }()
474         check_tcp(strconv.Itoa(http_port))
475         check_tcp(strconv.Itoa(https_port))
476
477         go start_topic_writer(writer_control, data_out_channel)
478
479         //Setup proc for periodic type registration
480         var event_chan = make(chan int) //Channel for stopping the proc
481         go periodic_registration(event_chan)
482
483         //Wait for term/int signal do try to shut down gracefully
484         sigs := make(chan os.Signal, 1)
485         signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
486         go func() {
487                 sig := <-sigs
488                 fmt.Printf("Received signal %s - application will terminate\n", sig)
489                 event_chan <- 0 // Stop periodic registration
490                 datalock.Lock()
491                 defer datalock.Unlock()
492                 AppState = Terminating
493                 http_server.Shutdown(context.Background())
494                 https_server.Shutdown(context.Background())
495                 // Stopping jobs
496                 for key, _ := range TypeJobs {
497                         log.Info("Stopping type job:", key)
498                         for _, dp := range InfoTypes.ProdDataTypes {
499                                 if key == dp.ID {
500                                         remove_type_job(dp)
501                                 }
502                         }
503                 }
504         }()
505
506         AppState = Running
507
508         //Wait until all go routines has exited
509         runtime.Goexit()
510
511         fmt.Println("main routine exit")
512         fmt.Println("server stopped")
513 }
514
515 func check_tcp(port string) {
516         log.Info("Checking tcp port: ", port)
517         for true {
518                 address := net.JoinHostPort("localhost", port)
519                 // 3 second timeout
520                 conn, err := net.DialTimeout("tcp", address, 3*time.Second)
521                 if err != nil {
522                         log.Info("Checking tcp port: ", port, " failed, retrying...")
523                 } else {
524                         if conn != nil {
525                                 log.Info("Checking tcp port: ", port, " - OK")
526                                 _ = conn.Close()
527                                 return
528                         } else {
529                                 log.Info("Checking tcp port: ", port, " failed, retrying...")
530                         }
531                 }
532         }
533 }
534
535 //== Core functions ==//
536
537 // Run periodic registration of producers
538 func periodic_registration(evtch chan int) {
539         var delay int = 1
540         for {
541                 select {
542                 case msg := <-evtch:
543                         if msg == 0 { // Stop thread
544                                 return
545                         }
546                 case <-time.After(time.Duration(delay) * time.Second):
547                         ok := register_producer()
548                         if ok {
549                                 delay = registration_delay_long
550                         } else {
551                                 if delay < registration_delay_long {
552                                         delay += registration_delay_short
553                                 } else {
554                                         delay = registration_delay_short
555                                 }
556                         }
557                 }
558         }
559 }
560
561 func register_producer() bool {
562
563         log.Info("Registering producer: ", producer_instance_name)
564
565         file, err := os.ReadFile(config_file)
566         if err != nil {
567                 log.Error("Cannot read config file: ", config_file)
568                 log.Error("Registering producer: ", producer_instance_name, " - failed")
569                 return false
570         }
571         data := DataTypes{}
572         err = jsoniter.Unmarshal([]byte(file), &data)
573         if err != nil {
574                 log.Error("Cannot parse config file: ", config_file)
575                 log.Error("Registering producer: ", producer_instance_name, " - failed")
576                 return false
577         }
578         var new_type_names []string
579
580         for i := 0; i < len(data.ProdDataTypes); i++ {
581                 t1 := make(map[string]interface{})
582                 t2 := make(map[string]interface{})
583
584                 t2["schema"] = "http://json-schema.org/draft-07/schema#"
585                 t2["title"] = data.ProdDataTypes[i].ID
586                 t2["description"] = data.ProdDataTypes[i].ID
587                 t2["type"] = "object"
588
589                 t1["info_job_data_schema"] = t2
590
591                 json, err := json.Marshal(t1)
592                 if err != nil {
593                         log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
594                         log.Error("Registering producer: ", producer_instance_name, " - failed")
595                         return false
596                 } else {
597                         ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
598                         if !ok {
599                                 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
600                                 log.Error("Registering producer: ", producer_instance_name, " - failed")
601                                 return false
602                         }
603                         new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
604                 }
605
606         }
607
608         log.Debug("Registering types: ", new_type_names)
609         m := make(map[string]interface{})
610         m["supported_info_types"] = new_type_names
611         m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
612         m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
613
614         json, err := json.Marshal(m)
615         if err != nil {
616                 log.Error("Cannot create json for producer: ", producer_instance_name)
617                 log.Error("Registering producer: ", producer_instance_name, " - failed")
618                 return false
619         }
620         ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
621         if !ok {
622                 log.Error("Cannot register producer: ", producer_instance_name)
623                 log.Error("Registering producer: ", producer_instance_name, " - failed")
624                 return false
625         }
626         datalock.Lock()
627         defer datalock.Unlock()
628
629         var current_type_names []string
630         for _, v := range InfoTypes.ProdDataTypes {
631                 current_type_names = append(current_type_names, v.ID)
632                 if contains_str(new_type_names, v.ID) {
633                         //Type exist
634                         log.Debug("Type ", v.ID, " exists")
635                         create_ext_job(v)
636                 } else {
637                         //Type is removed
638                         log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
639                         remove_type_job(v)
640                 }
641         }
642
643         for _, v := range data.ProdDataTypes {
644                 if contains_str(current_type_names, v.ID) {
645                         //Type exist
646                         log.Debug("Type ", v.ID, " exists")
647                         create_ext_job(v)
648                 } else {
649                         //Type is new
650                         log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
651                         start_type_job(v)
652                 }
653         }
654
655         InfoTypes = data
656         log.Debug("Datatypes: ", InfoTypes)
657
658         log.Info("Registering producer: ", producer_instance_name, " - OK")
659         return true
660 }
661
662 func remove_type_job(dp DataType) {
663         log.Info("Removing type job: ", dp.ID)
664         j, ok := TypeJobs[dp.ID]
665         if ok {
666                 j.reader_control <- ReaderControl{"EXIT"}
667         }
668
669         if dp.ext_job_created == true {
670                 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
671                 ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
672                 if !ok {
673                         log.Error("Cannot delete job: ", dp.ext_job_id)
674                 }
675                 dp.ext_job_created = false
676                 dp.ext_job = nil
677         }
678
679 }
680
681 func start_type_job(dp DataType) {
682         log.Info("Starting type job: ", dp.ID)
683         job_record := TypeJobRecord{}
684
685         job_record.job_control = make(chan JobControl, 1)
686         job_record.reader_control = make(chan ReaderControl, 1)
687         job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
688         job_record.InfoType = dp.ID
689         job_record.InputTopic = dp.KafkaInputTopic
690         job_record.groupId = "kafka-procon-" + dp.ID
691         job_record.clientId = dp.ID + "-" + os.Getenv("KP")
692         var stats TypeJobStats
693         job_record.statistics = &stats
694
695         switch dp.ID {
696         case "xml-file-data-to-filestore":
697                 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
698         case "xml-file-data":
699                 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
700         case "json-file-data-from-filestore":
701                 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
702         case "json-file-data":
703                 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
704         case "json-file-data-from-filestore-to-influx":
705                 go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
706
707         default:
708         }
709
710         go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
711
712         TypeJobs[dp.ID] = job_record
713         log.Debug("Type job input type: ", dp.InputJobType)
714         create_ext_job(dp)
715 }
716
717 func create_ext_job(dp DataType) {
718         if dp.InputJobType != "" {
719                 jb := make(map[string]interface{})
720                 jb["info_type_id"] = dp.InputJobType
721                 jb["job_owner"] = "console" //TODO:
722                 jb["status_notification_uri"] = "http://callback:80/post"
723                 jb1 := make(map[string]interface{})
724                 jb["job_definition"] = jb1
725                 jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
726
727                 json, err := json.Marshal(jb)
728                 dp.ext_job_created = false
729                 dp.ext_job = nil
730                 if err != nil {
731                         log.Error("Cannot create json for type: ", dp.InputJobType)
732                         return
733                 }
734
735                 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
736                 ok := false
737                 for !ok {
738                         ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
739                         if !ok {
740                                 log.Error("Cannot register job: ", dp.InputJobType)
741                         }
742                 }
743                 log.Debug("Registered job ok: ", dp.InputJobType)
744                 dp.ext_job_created = true
745                 dp.ext_job = &json
746         }
747 }
748
749 func remove_info_job(jobid string) {
750         log.Info("Removing info job: ", jobid)
751         filter := Filter{}
752         filter.JobId = jobid
753         jc := JobControl{}
754         jc.command = "REMOVE-FILTER"
755         jc.filter = filter
756         infoJob := InfoJobs[jobid]
757         typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
758         typeJob.job_control <- jc
759         delete(InfoJobs, jobid)
760
761 }
762
763 // == Helper functions ==//
764
765 // Function to check the status of a mutex lock
766 func MutexLocked(m *sync.Mutex) bool {
767         state := reflect.ValueOf(m).Elem().FieldByName("state")
768         return state.Int()&mutexLocked == mutexLocked
769 }
770
771 // Test if slice contains a string
772 func contains_str(s []string, e string) bool {
773         for _, a := range s {
774                 if a == e {
775                         return true
776                 }
777         }
778         return false
779 }
780
781 // Send a http request with json (json may be nil)
782 func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
783
784         // set the HTTP method, url, and request body
785         var req *http.Request
786         var err error
787         if json == nil {
788                 req, err = http.NewRequest(method, url, http.NoBody)
789         } else {
790                 req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
791                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
792         }
793         if err != nil {
794                 log.Error("Cannot create http request, method: ", method, " url: ", url)
795                 return false
796         }
797
798         if useAuth {
799                 token, err := fetch_token()
800                 if err != nil {
801                         log.Error("Cannot fetch token for http request: ", err)
802                         return false
803                 }
804                 req.Header.Set("Authorization", "Bearer "+token.TokenValue)
805         }
806
807         log.Debug("HTTP request: ", req)
808
809         log.Debug("Sending http request")
810         resp, err2 := httpclient.Do(req)
811         if err2 != nil {
812                 log.Error("Http request error: ", err2)
813                 log.Error("Cannot send http request method: ", method, " url: ", url)
814         } else {
815                 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
816                         log.Debug("Accepted http status: ", resp.StatusCode)
817                         resp.Body.Close()
818                         return true
819                 }
820                 log.Debug("HTTP resp: ", resp)
821                 resp.Body.Close()
822         }
823         return false
824 }
825
826 func fetch_token() (*kafka.OAuthBearerToken, error) {
827         log.Debug("Get token inline")
828         conf := &clientcredentials.Config{
829                 ClientID:     creds_client_id,
830                 ClientSecret: creds_client_secret,
831                 TokenURL:     creds_service_url,
832         }
833         token, err := conf.Token(context.Background())
834         if err != nil {
835                 log.Warning("Cannot fetch access token: ", err)
836                 return nil, err
837         }
838         extensions := map[string]string{}
839         log.Debug("=====================================================")
840         log.Debug("token: ", token)
841         log.Debug("=====================================================")
842         log.Debug("TokenValue: ", token.AccessToken)
843         log.Debug("=====================================================")
844         log.Debug("Expiration: ", token.Expiry)
845         t := token.Expiry
846         oauthBearerToken := kafka.OAuthBearerToken{
847                 TokenValue: token.AccessToken,
848                 Expiration: t,
849                 Extensions: extensions,
850         }
851
852         return &oauthBearerToken, nil
853 }
854
855 // Function to print memory details
856 // https://pkg.go.dev/runtime#MemStats
857 func PrintMemUsage() {
858         if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
859                 var m runtime.MemStats
860                 runtime.ReadMemStats(&m)
861                 fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
862                 fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
863                 fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
864                 fmt.Printf("\tNumGC = %v\n", m.NumGC)
865                 fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
866                 fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
867                 fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
868                 fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
869                 fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
870                 fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
871                 fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
872         }
873 }
874
875 func bToMb(b uint64) uint64 {
876         return b / 1024 / 1024
877 }
878
879 func generate_uuid_from_type(s string) string {
880         if len(s) > 16 {
881                 s = s[:16]
882         }
883         for len(s) < 16 {
884                 s = s + "0"
885         }
886         b := []byte(s)
887         b = b[:16]
888         uuid, _ := uuid.FromBytes(b)
889         return uuid.String()
890 }
891
892 // Write gzipped data to a Writer
893 func gzipWrite(w io.Writer, data *[]byte) error {
894         gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
895
896         if err1 != nil {
897                 return err1
898         }
899         defer gw.Close()
900         _, err2 := gw.Write(*data)
901         return err2
902 }
903
904 // Write gunzipped data from Reader to a Writer
905 func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
906         gr, err1 := gzip.NewReader(data)
907
908         if err1 != nil {
909                 return err1
910         }
911         defer gr.Close()
912         data2, err2 := io.ReadAll(gr)
913         if err2 != nil {
914                 return err2
915         }
916         _, err3 := w.Write(data2)
917         if err3 != nil {
918                 return err3
919         }
920         return nil
921 }
922
923 func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
924         tctx := context.Background()
925         err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
926         if err != nil {
927                 // Check to see if we already own this bucket (which happens if you run this twice)
928                 exists, errBucketExists := mc.BucketExists(tctx, bucket)
929                 if errBucketExists == nil && exists {
930                         log.Debug("Already own bucket:", bucket)
931                         add_bucket(client_id, bucket)
932                         return nil
933                 } else {
934                         log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
935                         return err
936                 }
937         }
938         log.Debug("Successfully created bucket: ", bucket)
939         add_bucket(client_id, bucket)
940         return nil
941 }
942
943 func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
944         ok := bucket_exist(client_id, bucket)
945         if ok {
946                 return true
947         }
948         tctx := context.Background()
949         exists, err := mc.BucketExists(tctx, bucket)
950         if err == nil && exists {
951                 log.Debug("Already own bucket:", bucket)
952                 return true
953         }
954         log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
955         return false
956 }
957
958 func add_bucket(minio_id string, bucket string) {
959         datalock.Lock()
960         defer datalock.Unlock()
961
962         b, ok := minio_bucketlist[minio_id]
963         if !ok {
964                 b = Minio_buckets{}
965                 b.Buckets = make(map[string]bool)
966         }
967         b.Buckets[bucket] = true
968         minio_bucketlist[minio_id] = b
969 }
970
971 func bucket_exist(minio_id string, bucket string) bool {
972         datalock.Lock()
973         defer datalock.Unlock()
974
975         b, ok := minio_bucketlist[minio_id]
976         if !ok {
977                 return false
978         }
979         _, ok = b.Buckets[bucket]
980         return ok
981 }
982
983 //== http api functions ==//
984
985 // create/update job
986 func create_job(w http.ResponseWriter, req *http.Request) {
987         log.Debug("Create job, http method: ", req.Method)
988         if req.Method != http.MethodPost {
989                 log.Error("Create job, http method not allowed")
990                 w.WriteHeader(http.StatusMethodNotAllowed)
991                 return
992         }
993         ct := req.Header.Get("Content-Type")
994         if ct != "application/json" {
995                 log.Error("Create job, bad content type")
996                 http.Error(w, "Bad content type", http.StatusBadRequest)
997                 return
998         }
999
1000         var t InfoJobDataType
1001         err := json.NewDecoder(req.Body).Decode(&t)
1002         if err != nil {
1003                 log.Error("Create job, cannot parse json,", err)
1004                 http.Error(w, "Cannot parse json", http.StatusBadRequest)
1005                 return
1006         }
1007         log.Debug("Creating job, id: ", t.InfoJobIdentity)
1008         datalock.Lock()
1009         defer datalock.Unlock()
1010
1011         job_id := t.InfoJobIdentity
1012         job_record, job_found := InfoJobs[job_id]
1013         type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
1014         if !job_found {
1015                 if !found_type {
1016                         log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1017                         http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1018                         return
1019                 }
1020         } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
1021                 log.Error("Job cannot change type")
1022                 http.Error(w, "Job cannot change type", http.StatusBadRequest)
1023                 return
1024         } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
1025                 log.Error("Job cannot change topic")
1026                 http.Error(w, "Job cannot change topic", http.StatusBadRequest)
1027                 return
1028         } else if !found_type {
1029                 //Should never happen, if the type is removed then job is stopped
1030                 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1031                 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1032                 return
1033         }
1034
1035         if !job_found {
1036                 job_record = InfoJobRecord{}
1037                 job_record.job_info = t
1038                 output_topic := t.InfoJobData.KafkaOutputTopic
1039                 job_record.output_topic = t.InfoJobData.KafkaOutputTopic
1040                 log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
1041
1042                 var stats InfoJobStats
1043                 job_record.statistics = &stats
1044
1045                 filter := Filter{}
1046                 filter.JobId = job_id
1047                 filter.OutputTopic = job_record.output_topic
1048
1049                 jc := JobControl{}
1050
1051                 jc.command = "ADD-FILTER"
1052
1053                 if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1054                         fm := FilterMaps{}
1055                         fm.sourceNameMap = make(map[string]bool)
1056                         fm.measObjClassMap = make(map[string]bool)
1057                         fm.measObjInstIdsMap = make(map[string]bool)
1058                         fm.measTypesMap = make(map[string]bool)
1059                         if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
1060                                 for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
1061                                         fm.sourceNameMap[v] = true
1062                                 }
1063                         }
1064                         if t.InfoJobData.FilterParams.MeasObjClass != nil {
1065                                 for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
1066                                         fm.measObjClassMap[v] = true
1067                                 }
1068                         }
1069                         if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
1070                                 for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
1071                                         fm.measObjInstIdsMap[v] = true
1072                                 }
1073                         }
1074                         if t.InfoJobData.FilterParams.MeasTypes != nil {
1075                                 for _, v := range t.InfoJobData.FilterParams.MeasTypes {
1076                                         fm.measTypesMap[v] = true
1077                                 }
1078                         }
1079                         filter.filter = fm
1080                 }
1081                 if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1082                         influxparam := InfluxJobParameters{}
1083                         influxparam.DbUrl = t.InfoJobData.DbUrl
1084                         influxparam.DbOrg = t.InfoJobData.DbOrg
1085                         influxparam.DbBucket = t.InfoJobData.DbBucket
1086                         influxparam.DbToken = t.InfoJobData.DbToken
1087                         filter.influxParameters = influxparam
1088                 }
1089
1090                 jc.filter = filter
1091                 InfoJobs[job_id] = job_record
1092
1093                 type_job_record.job_control <- jc
1094
1095         } else {
1096                 //TODO
1097                 //Update job
1098         }
1099 }
1100
1101 // delete job
1102 func delete_job(w http.ResponseWriter, req *http.Request) {
1103         if req.Method != http.MethodDelete {
1104                 w.WriteHeader(http.StatusMethodNotAllowed)
1105                 return
1106         }
1107         datalock.Lock()
1108         defer datalock.Unlock()
1109
1110         vars := mux.Vars(req)
1111
1112         if id, ok := vars["job_id"]; ok {
1113                 if _, ok := InfoJobs[id]; ok {
1114                         remove_info_job(id)
1115                         w.WriteHeader(http.StatusNoContent)
1116                         log.Info("Job ", id, " deleted")
1117                         return
1118                 }
1119         }
1120         w.WriteHeader(http.StatusNotFound)
1121 }
1122
1123 // job supervision
1124 func supervise_job(w http.ResponseWriter, req *http.Request) {
1125         if req.Method != http.MethodGet {
1126                 w.WriteHeader(http.StatusMethodNotAllowed)
1127                 return
1128         }
1129         datalock.Lock()
1130         defer datalock.Unlock()
1131
1132         vars := mux.Vars(req)
1133
1134         log.Debug("Supervising, job: ", vars["job_id"])
1135         if id, ok := vars["job_id"]; ok {
1136                 if _, ok := InfoJobs[id]; ok {
1137                         log.Debug("Supervision ok, job", id)
1138                         return
1139                 }
1140         }
1141         w.WriteHeader(http.StatusNotFound)
1142 }
1143
1144 // producer supervision
1145 func supervise_producer(w http.ResponseWriter, req *http.Request) {
1146         if req.Method != http.MethodGet {
1147                 w.WriteHeader(http.StatusMethodNotAllowed)
1148                 return
1149         }
1150
1151         w.WriteHeader(http.StatusOK)
1152 }
1153
1154 // producer statistics, all jobs
1155 func statistics(w http.ResponseWriter, req *http.Request) {
1156         if req.Method != http.MethodGet {
1157                 w.WriteHeader(http.StatusMethodNotAllowed)
1158                 return
1159         }
1160         m := make(map[string]interface{})
1161         log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
1162         datalock.Lock()
1163         defer datalock.Unlock()
1164         req.Header.Set("Content-Type", "application/json; charset=utf-8")
1165         m["number-of-jobs"] = len(InfoJobs)
1166         m["number-of-types"] = len(InfoTypes.ProdDataTypes)
1167         qm := make(map[string]interface{})
1168         m["jobs"] = qm
1169         for key, elem := range InfoJobs {
1170                 jm := make(map[string]interface{})
1171                 qm[key] = jm
1172                 jm["type"] = elem.job_info.InfoTypeIdentity
1173                 typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
1174                 jm["groupId"] = typeJob.groupId
1175                 jm["clientID"] = typeJob.clientId
1176                 jm["input topic"] = typeJob.InputTopic
1177                 jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
1178                 jm["output topic"] = elem.output_topic
1179                 jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
1180                 jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
1181                 jm["msg_out (job)"] = elem.statistics.out_msg_cnt
1182
1183         }
1184         json, err := json.Marshal(m)
1185         if err != nil {
1186                 w.WriteHeader(http.StatusInternalServerError)
1187                 log.Error("Cannot marshal statistics json")
1188                 return
1189         }
1190         _, err = w.Write(json)
1191         if err != nil {
1192                 w.WriteHeader(http.StatusInternalServerError)
1193                 log.Error("Cannot send statistics json")
1194                 return
1195         }
1196 }
1197
1198 // Simple alive check
1199 func alive(w http.ResponseWriter, req *http.Request) {
1200         //Alive check
1201 }
1202
1203 // Get/Set logging level
1204 func logging_level(w http.ResponseWriter, req *http.Request) {
1205         vars := mux.Vars(req)
1206         if level, ok := vars["level"]; ok {
1207                 if req.Method == http.MethodPut {
1208                         switch level {
1209                         case "trace":
1210                                 log.SetLevel(log.TraceLevel)
1211                         case "debug":
1212                                 log.SetLevel(log.DebugLevel)
1213                         case "info":
1214                                 log.SetLevel(log.InfoLevel)
1215                         case "warn":
1216                                 log.SetLevel(log.WarnLevel)
1217                         case "error":
1218                                 log.SetLevel(log.ErrorLevel)
1219                         case "fatal":
1220                                 log.SetLevel(log.FatalLevel)
1221                         case "panic":
1222                                 log.SetLevel(log.PanicLevel)
1223                         default:
1224                                 w.WriteHeader(http.StatusNotFound)
1225                         }
1226                 } else {
1227                         w.WriteHeader(http.StatusMethodNotAllowed)
1228                 }
1229         } else {
1230                 if req.Method == http.MethodGet {
1231                         msg := "none"
1232                         if log.IsLevelEnabled(log.PanicLevel) {
1233                                 msg = "panic"
1234                         } else if log.IsLevelEnabled(log.FatalLevel) {
1235                                 msg = "fatal"
1236                         } else if log.IsLevelEnabled(log.ErrorLevel) {
1237                                 msg = "error"
1238                         } else if log.IsLevelEnabled(log.WarnLevel) {
1239                                 msg = "warn"
1240                         } else if log.IsLevelEnabled(log.InfoLevel) {
1241                                 msg = "info"
1242                         } else if log.IsLevelEnabled(log.DebugLevel) {
1243                                 msg = "debug"
1244                         } else if log.IsLevelEnabled(log.TraceLevel) {
1245                                 msg = "trace"
1246                         }
1247                         w.Header().Set("Content-Type", "application/text")
1248                         w.Write([]byte(msg))
1249                 } else {
1250                         w.WriteHeader(http.StatusMethodNotAllowed)
1251                 }
1252         }
1253 }
1254
1255 func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
1256
1257         log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
1258
1259         topic_ok := false
1260         var c *kafka.Consumer = nil
1261         running := true
1262
1263         for topic_ok == false {
1264
1265                 select {
1266                 case reader_ctrl := <-control_ch:
1267                         if reader_ctrl.command == "EXIT" {
1268                                 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1269                                 data_ch <- nil //Signal to job handler
1270                                 running = false
1271                                 return
1272                         }
1273                 case <-time.After(1 * time.Second):
1274                         if !running {
1275                                 return
1276                         }
1277                         if c == nil {
1278                                 c = create_kafka_consumer(type_id, gid, cid)
1279                                 if c == nil {
1280                                         log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
1281                                 } else {
1282                                         log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
1283                                 }
1284                         }
1285                         if c != nil && topic_ok == false {
1286                                 err := c.SubscribeTopics([]string{topic}, nil)
1287                                 if err != nil {
1288                                         log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
1289                                 } else {
1290                                         log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
1291                                         topic_ok = true
1292                                 }
1293                         }
1294                 }
1295         }
1296         log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
1297
1298         var event_chan = make(chan int)
1299         go func() {
1300                 for {
1301                         select {
1302                         case evt := <-c.Events():
1303                                 switch evt.(type) {
1304                                 case kafka.OAuthBearerTokenRefresh:
1305                                         log.Debug("New consumer token needed: ", evt)
1306                                         token, err := fetch_token()
1307                                         if err != nil {
1308                                                 log.Warning("Cannot cannot fetch token: ", err)
1309                                                 c.SetOAuthBearerTokenFailure(err.Error())
1310                                         } else {
1311                                                 setTokenError := c.SetOAuthBearerToken(*token)
1312                                                 if setTokenError != nil {
1313                                                         log.Warning("Cannot cannot set token: ", setTokenError)
1314                                                         c.SetOAuthBearerTokenFailure(setTokenError.Error())
1315                                                 }
1316                                         }
1317                                 default:
1318                                         log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
1319                                 }
1320
1321                         case msg := <-event_chan:
1322                                 if msg == 0 {
1323                                         return
1324                                 }
1325                         case <-time.After(1 * time.Second):
1326                                 if !running {
1327                                         return
1328                                 }
1329                         }
1330                 }
1331         }()
1332
1333         go func() {
1334                 for {
1335                         for {
1336                                 select {
1337                                 case reader_ctrl := <-control_ch:
1338                                         if reader_ctrl.command == "EXIT" {
1339                                                 event_chan <- 0
1340                                                 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1341                                                 data_ch <- nil //Signal to job handler
1342                                                 defer c.Close()
1343                                                 return
1344                                         }
1345                                 default:
1346
1347                                         ev := c.Poll(1000)
1348                                         if ev == nil {
1349                                                 log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
1350                                                 continue
1351                                         }
1352                                         switch e := ev.(type) {
1353                                         case *kafka.Message:
1354                                                 var kmsg KafkaPayload
1355                                                 kmsg.msg = e
1356
1357                                                 c.Commit()
1358
1359                                                 data_ch <- &kmsg
1360                                                 stats.in_msg_cnt++
1361                                                 log.Debug("Reader msg: ", &kmsg)
1362                                                 log.Debug("Reader - data_ch ", data_ch)
1363                                         case kafka.Error:
1364                                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
1365
1366                                         case kafka.OAuthBearerTokenRefresh:
1367                                                 log.Debug("New consumer token needed: ", ev)
1368                                                 token, err := fetch_token()
1369                                                 if err != nil {
1370                                                         log.Warning("Cannot cannot fetch token: ", err)
1371                                                         c.SetOAuthBearerTokenFailure(err.Error())
1372                                                 } else {
1373                                                         setTokenError := c.SetOAuthBearerToken(*token)
1374                                                         if setTokenError != nil {
1375                                                                 log.Warning("Cannot cannot set token: ", setTokenError)
1376                                                                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1377                                                         }
1378                                                 }
1379                                         default:
1380                                                 fmt.Printf("Ignored %v\n", e)
1381                                         }
1382                                 }
1383                         }
1384                 }
1385         }()
1386 }
1387
1388 func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
1389
1390         var kafka_producer *kafka.Producer
1391
1392         running := true
1393         log.Info("Topic writer starting")
1394
1395         // Wait for kafka producer to become available - and be prepared to exit the writer
1396         for kafka_producer == nil {
1397                 select {
1398                 case writer_ctl := <-control_ch:
1399                         if writer_ctl.command == "EXIT" {
1400                                 //ignore cmd
1401                         }
1402                 default:
1403                         kafka_producer = start_producer()
1404                         if kafka_producer == nil {
1405                                 log.Debug("Could not start kafka producer - retrying")
1406                                 time.Sleep(1 * time.Second)
1407                         } else {
1408                                 log.Debug("Kafka producer started")
1409                         }
1410                 }
1411         }
1412
1413         var event_chan = make(chan int)
1414         go func() {
1415                 for {
1416                         select {
1417                         case evt := <-kafka_producer.Events():
1418                                 switch evt.(type) {
1419                                 case *kafka.Message:
1420                                         m := evt.(*kafka.Message)
1421
1422                                         if m.TopicPartition.Error != nil {
1423                                                 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
1424                                         } else {
1425                                                 log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
1426                                         }
1427                                 case kafka.Error:
1428                                         log.Debug("Dumping topic writer event, error: ", evt)
1429                                 case kafka.OAuthBearerTokenRefresh:
1430                                         log.Debug("New producer token needed: ", evt)
1431                                         token, err := fetch_token()
1432                                         if err != nil {
1433                                                 log.Warning("Cannot cannot fetch token: ", err)
1434                                                 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
1435                                         } else {
1436                                                 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
1437                                                 if setTokenError != nil {
1438                                                         log.Warning("Cannot cannot set token: ", setTokenError)
1439                                                         kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
1440                                                 }
1441                                         }
1442                                 default:
1443                                         log.Debug("Dumping topic writer event, unknown: ", evt)
1444                                 }
1445
1446                         case msg := <-event_chan:
1447                                 if msg == 0 {
1448                                         return
1449                                 }
1450                         case <-time.After(1 * time.Second):
1451                                 if !running {
1452                                         return
1453                                 }
1454                         }
1455                 }
1456         }()
1457         go func() {
1458                 for {
1459                         select {
1460                         case writer_ctl := <-control_ch:
1461                                 if writer_ctl.command == "EXIT" {
1462                                         // ignore - wait for channel signal
1463                                 }
1464
1465                         case kmsg := <-data_ch:
1466                                 if kmsg == nil {
1467                                         event_chan <- 0
1468                                         log.Info("Topic writer stopped by channel signal - start_topic_writer")
1469                                         defer kafka_producer.Close()
1470                                         return
1471                                 }
1472
1473                                 retries := 10
1474                                 msg_ok := false
1475                                 var err error
1476                                 for retry := 1; retry <= retries && msg_ok == false; retry++ {
1477                                         err = kafka_producer.Produce(&kafka.Message{
1478                                                 TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
1479                                                 Value:          kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
1480
1481                                         if err == nil {
1482                                                 incr_out_msg_cnt(kmsg.jobid)
1483                                                 msg_ok = true
1484                                                 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
1485                                         } else {
1486                                                 log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
1487                                                 time.Sleep(time.Duration(retry) * time.Second)
1488                                         }
1489                                 }
1490                                 if !msg_ok {
1491                                         log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
1492                                 }
1493                         case <-time.After(1000 * time.Millisecond):
1494                                 if !running {
1495                                         return
1496                                 }
1497                         }
1498                 }
1499         }()
1500 }
1501
1502 func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
1503         var cm kafka.ConfigMap
1504         if creds_grant_type == "" {
1505                 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1506                 cm = kafka.ConfigMap{
1507                         "bootstrap.servers":  bootstrapserver,
1508                         "group.id":           gid,
1509                         "client.id":          cid,
1510                         "auto.offset.reset":  "latest",
1511                         "enable.auto.commit": false,
1512                 }
1513         } else {
1514                 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1515                 cm = kafka.ConfigMap{
1516                         "bootstrap.servers":  bootstrapserver,
1517                         "group.id":           gid,
1518                         "client.id":          cid,
1519                         "auto.offset.reset":  "latest",
1520                         "enable.auto.commit": false,
1521                         "sasl.mechanism":     "OAUTHBEARER",
1522                         "security.protocol":  "SASL_PLAINTEXT",
1523                 }
1524         }
1525         c, err := kafka.NewConsumer(&cm)
1526
1527         if err != nil {
1528                 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
1529                 return nil
1530         }
1531
1532         log.Info("Created kafka consumer for type: ", type_id, " OK")
1533         return c
1534 }
1535
1536 // Start kafka producer
1537 func start_producer() *kafka.Producer {
1538         log.Info("Creating kafka producer")
1539
1540         var cm kafka.ConfigMap
1541         if creds_grant_type == "" {
1542                 log.Info("Creating kafka SASL plain text producer")
1543                 cm = kafka.ConfigMap{
1544                         "bootstrap.servers": bootstrapserver,
1545                 }
1546         } else {
1547                 log.Info("Creating kafka SASL plain text producer")
1548                 cm = kafka.ConfigMap{
1549                         "bootstrap.servers": bootstrapserver,
1550                         "sasl.mechanism":    "OAUTHBEARER",
1551                         "security.protocol": "SASL_PLAINTEXT",
1552                 }
1553         }
1554
1555         p, err := kafka.NewProducer(&cm)
1556         if err != nil {
1557                 log.Error("Cannot create kafka producer,", err)
1558                 return nil
1559         }
1560         return p
1561 }
1562
1563 func start_adminclient() *kafka.AdminClient {
1564         log.Info("Creating kafka admin client")
1565         a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
1566         if err != nil {
1567                 log.Error("Cannot create kafka admin client,", err)
1568                 return nil
1569         }
1570         return a
1571 }
1572
1573 func create_minio_client(id string) (*minio.Client, *error) {
1574         log.Debug("Get minio client")
1575         minio_client, err := minio.New(filestore_server, &minio.Options{
1576                 Secure: false,
1577                 Creds:  credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
1578         })
1579         if err != nil {
1580                 log.Error("Cannot create minio client, ", err)
1581                 return nil, &err
1582         }
1583         return minio_client, nil
1584 }
1585
1586 func incr_out_msg_cnt(jobid string) {
1587         j, ok := InfoJobs[jobid]
1588         if ok {
1589                 j.statistics.out_msg_cnt++
1590         }
1591 }
1592
1593 func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
1594
1595         log.Info("Type job", type_id, " started")
1596
1597         filters := make(map[string]Filter)
1598         topic_list := make(map[string]string)
1599         var mc *minio.Client
1600         const mc_id = "mc_" + "start_job_xml_file_data"
1601         running := true
1602         for {
1603                 select {
1604                 case job_ctl := <-control_ch:
1605                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1606                         switch job_ctl.command {
1607                         case "EXIT":
1608                                 //ignore cmd - handled by channel signal
1609                         case "ADD-FILTER":
1610                                 filters[job_ctl.filter.JobId] = job_ctl.filter
1611                                 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1612
1613                                 tmp_topic_list := make(map[string]string)
1614                                 for k, v := range topic_list {
1615                                         tmp_topic_list[k] = v
1616                                 }
1617                                 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1618                                 topic_list = tmp_topic_list
1619                         case "REMOVE-FILTER":
1620                                 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1621
1622                                 tmp_topic_list := make(map[string]string)
1623                                 for k, v := range topic_list {
1624                                         tmp_topic_list[k] = v
1625                                 }
1626                                 delete(tmp_topic_list, job_ctl.filter.JobId)
1627                                 topic_list = tmp_topic_list
1628                         }
1629
1630                 case msg := <-data_in_ch:
1631                         if msg == nil {
1632                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
1633
1634                                 running = false
1635                                 return
1636                         }
1637                         if fsbucket != "" && fvolume == "" {
1638                                 if mc == nil {
1639                                         var err *error
1640                                         mc, err = create_minio_client(mc_id)
1641                                         if err != nil {
1642                                                 log.Debug("Cannot create minio client for type job: ", type_id)
1643                                         }
1644                                 }
1645                         }
1646                         jobLimiterChan <- struct{}{}
1647                         go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
1648
1649                 case <-time.After(1 * time.Second):
1650                         if !running {
1651                                 return
1652                         }
1653                 }
1654         }
1655 }
1656
1657 func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
1658         defer func() {
1659                 <-jobLimiterChan
1660         }()
1661         PrintMemUsage()
1662
1663         if fvolume == "" && fsbucket == "" {
1664                 log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
1665                 return
1666         } else if (fvolume != "") && (fsbucket != "") {
1667                 log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
1668                 return
1669         }
1670
1671         start := time.Now()
1672         var evt_data XmlFileEventHeader
1673
1674         err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
1675         if err != nil {
1676                 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
1677                 return
1678         }
1679         log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
1680
1681         var reader io.Reader
1682
1683         INPUTBUCKET := "ropfiles"
1684
1685         filename := ""
1686         if fvolume != "" {
1687                 filename = fvolume + "/" + evt_data.Name
1688                 fi, err := os.Open(filename)
1689
1690                 if err != nil {
1691                         log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
1692                         return
1693                 }
1694                 defer fi.Close()
1695                 reader = fi
1696         } else {
1697                 filename = evt_data.Name
1698                 if mc != nil {
1699                         tctx := context.Background()
1700                         mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
1701                         if err != nil {
1702                                 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
1703                                 return
1704                         }
1705                         if mr == nil {
1706                                 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader -  discarding message, error details: ", err)
1707                                 return
1708                         }
1709                         reader = mr
1710                         defer mr.Close()
1711                 } else {
1712                         log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client -  discarding message")
1713                         return
1714                 }
1715         }
1716
1717         if reader == nil {
1718                 log.Error("Cannot get: ", filename, " - null reader")
1719                 return
1720         }
1721         var file_bytes []byte
1722         if strings.HasSuffix(filename, "gz") {
1723                 start := time.Now()
1724                 var buf3 bytes.Buffer
1725                 errb := gunzipReaderToWriter(&buf3, reader)
1726                 if errb != nil {
1727                         log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
1728                         return
1729                 }
1730                 file_bytes = buf3.Bytes()
1731                 log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
1732
1733         } else {
1734                 var buf3 bytes.Buffer
1735                 _, err2 := io.Copy(&buf3, reader)
1736                 if err2 != nil {
1737                         log.Error("File ", filename, " - cannot be read, discarding message, ", err)
1738                         return
1739                 }
1740                 file_bytes = buf3.Bytes()
1741         }
1742         start = time.Now()
1743         b, err := xml_to_json_conv(&file_bytes, &evt_data)
1744         if err != nil {
1745                 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
1746                 return
1747         }
1748         log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
1749
1750         new_fn := evt_data.Name + os.Getenv("KP") + ".json"
1751         if outputCompression == "gz" {
1752                 new_fn = new_fn + ".gz"
1753                 start = time.Now()
1754                 var buf bytes.Buffer
1755                 err = gzipWrite(&buf, &b)
1756                 if err != nil {
1757                         log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
1758                         return
1759                 }
1760                 b = buf.Bytes()
1761                 log.Debug("Gzip file:  ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
1762
1763         }
1764         start = time.Now()
1765
1766         if fvolume != "" {
1767                 //Store on disk
1768                 err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
1769                 if err != nil {
1770                         log.Error("Cannot write file ", new_fn, " - discarding message,", err)
1771                         return
1772                 }
1773                 log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
1774         } else if fsbucket != "" {
1775                 // Store in minio
1776                 objectName := new_fn
1777                 if mc != nil {
1778
1779                         contentType := "application/json"
1780                         if strings.HasSuffix(objectName, ".gz") {
1781                                 contentType = "application/gzip"
1782                         }
1783
1784                         // Upload the xml file with PutObject
1785                         r := bytes.NewReader(b)
1786                         tctx := context.Background()
1787                         if check_minio_bucket(mc, mc_id, fsbucket) == false {
1788                                 err := create_minio_bucket(mc, mc_id, fsbucket)
1789                                 if err != nil {
1790                                         log.Error("Cannot create bucket: ", fsbucket, ", ", err)
1791                                         return
1792                                 }
1793                         }
1794                         ok := false
1795                         for i := 1; i < 64 && ok == false; i = i * 2 {
1796                                 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
1797                                 if err != nil {
1798
1799                                         if i == 1 {
1800                                                 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
1801                                         } else {
1802                                                 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
1803                                         }
1804                                         time.Sleep(time.Duration(i) * time.Second)
1805                                 } else {
1806                                         log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
1807                                         log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
1808                                         ok = true
1809                                 }
1810                         }
1811                         if !ok {
1812                                 log.Error("Cannot upload : ", objectName, ", ", err)
1813                         }
1814                 } else {
1815                         log.Error("Cannot upload: ", objectName, ", no client")
1816                 }
1817         }
1818
1819         start = time.Now()
1820         if fvolume == "" {
1821                 var fde FileDownloadedEvt
1822                 fde.Filename = new_fn
1823                 j, err := jsoniter.Marshal(fde)
1824
1825                 if err != nil {
1826                         log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
1827                         return
1828                 }
1829                 msg.msg.Value = j
1830         } else {
1831                 var fde FileDownloadedEvt
1832                 fde.Filename = new_fn
1833                 j, err := jsoniter.Marshal(fde)
1834
1835                 if err != nil {
1836                         log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
1837                         return
1838                 }
1839                 msg.msg.Value = j
1840         }
1841         msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
1842         log.Debug("Marshal file-collect event ", time.Since(start).String())
1843
1844         for k, v := range topic_list {
1845                 var kmsg *KafkaPayload = new(KafkaPayload)
1846                 kmsg.msg = msg.msg
1847                 kmsg.topic = v
1848                 kmsg.jobid = k
1849                 data_out_channel <- kmsg
1850         }
1851 }
1852
1853 func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
1854         var f MeasCollecFile
1855         start := time.Now()
1856         err := xml.Unmarshal(*f_byteValue, &f)
1857         if err != nil {
1858                 return nil, errors.New("Cannot unmarshal xml-file")
1859         }
1860         log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
1861
1862         start = time.Now()
1863         var pmfile PMJsonFile
1864
1865         pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
1866         pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
1867         pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
1868         pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
1869         pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
1870
1871         for _, it := range f.MeasData.MeasInfo {
1872                 var mili MeasInfoList
1873                 mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
1874                 for _, jt := range it.MeasType {
1875                         mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
1876                 }
1877                 for _, jt := range it.MeasValue {
1878                         var mv MeasValues
1879                         mv.MeasObjInstID = jt.MeasObjLdn
1880                         mv.SuspectFlag = jt.Suspect
1881                         if jt.Suspect == "" {
1882                                 mv.SuspectFlag = "false"
1883                         }
1884                         for _, kt := range jt.R {
1885                                 ni, _ := strconv.Atoi(kt.P)
1886                                 nv := kt.Text
1887                                 mr := MeasResults{ni, nv}
1888                                 mv.MeasResultsList = append(mv.MeasResultsList, mr)
1889                         }
1890                         mili.MeasValuesList = append(mili.MeasValuesList, mv)
1891                 }
1892
1893                 pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
1894         }
1895
1896         pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
1897
1898         //TODO: Fill more values
1899         pmfile.Event.CommonEventHeader.Domain = ""    //xfeh.Domain
1900         pmfile.Event.CommonEventHeader.EventID = ""   //xfeh.EventID
1901         pmfile.Event.CommonEventHeader.Sequence = 0   //xfeh.Sequence
1902         pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
1903         pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
1904         pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
1905         pmfile.Event.CommonEventHeader.Priority = ""            //xfeh.Priority
1906         pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
1907         pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
1908         pmfile.Event.CommonEventHeader.Version = ""                 //xfeh.Version
1909         pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
1910         pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
1911
1912         log.Debug("Convert xml to json : ", time.Since(start).String())
1913
1914         start = time.Now()
1915         json, err := jsoniter.Marshal(pmfile)
1916         log.Debug("Marshal json : ", time.Since(start).String())
1917
1918         if err != nil {
1919                 return nil, errors.New("Cannot marshal converted json")
1920         }
1921         return json, nil
1922 }
1923
1924 func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
1925
1926         log.Info("Type job", type_id, " started")
1927
1928         filters := make(map[string]Filter)
1929         filterParams_list := make(map[string]FilterMaps)
1930         topic_list := make(map[string]string)
1931         var mc *minio.Client
1932         const mc_id = "mc_" + "start_job_json_file_data"
1933         running := true
1934         for {
1935                 select {
1936                 case job_ctl := <-control_ch:
1937                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1938                         switch job_ctl.command {
1939                         case "EXIT":
1940                         case "ADD-FILTER":
1941                                 filters[job_ctl.filter.JobId] = job_ctl.filter
1942                                 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1943
1944                                 tmp_filterParams_list := make(map[string]FilterMaps)
1945                                 for k, v := range filterParams_list {
1946                                         tmp_filterParams_list[k] = v
1947                                 }
1948                                 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
1949                                 filterParams_list = tmp_filterParams_list
1950
1951                                 tmp_topic_list := make(map[string]string)
1952                                 for k, v := range topic_list {
1953                                         tmp_topic_list[k] = v
1954                                 }
1955                                 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1956                                 topic_list = tmp_topic_list
1957                         case "REMOVE-FILTER":
1958                                 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1959
1960                                 tmp_filterParams_list := make(map[string]FilterMaps)
1961                                 for k, v := range filterParams_list {
1962                                         tmp_filterParams_list[k] = v
1963                                 }
1964                                 delete(tmp_filterParams_list, job_ctl.filter.JobId)
1965                                 filterParams_list = tmp_filterParams_list
1966
1967                                 tmp_topic_list := make(map[string]string)
1968                                 for k, v := range topic_list {
1969                                         tmp_topic_list[k] = v
1970                                 }
1971                                 delete(tmp_topic_list, job_ctl.filter.JobId)
1972                                 topic_list = tmp_topic_list
1973                         }
1974
1975                 case msg := <-data_in_ch:
1976                         if msg == nil {
1977                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
1978
1979                                 running = false
1980                                 return
1981                         }
1982                         if objectstore {
1983                                 if mc == nil {
1984                                         var err *error
1985                                         mc, err = create_minio_client(mc_id)
1986                                         if err != nil {
1987                                                 log.Debug("Cannot create minio client for type job: ", type_id)
1988                                         }
1989                                 }
1990                         }
1991                         //TODO: Sort processed file conversions in order (FIFO)
1992                         jobLimiterChan <- struct{}{}
1993                         go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
1994
1995                 case <-time.After(1 * time.Second):
1996                         if !running {
1997                                 return
1998                         }
1999                 }
2000         }
2001 }
2002
2003 func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2004
2005         //Release job limit
2006         defer func() {
2007                 <-jobLimiterChan
2008         }()
2009
2010         PrintMemUsage()
2011
2012         var evt_data FileDownloadedEvt
2013         err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2014         if err != nil {
2015                 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2016                 return
2017         }
2018         log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2019
2020         var reader io.Reader
2021
2022         INPUTBUCKET := "pm-files-json"
2023         filename := ""
2024         if objectstore == false {
2025                 filename = files_volume + "/" + evt_data.Filename
2026                 fi, err := os.Open(filename)
2027
2028                 if err != nil {
2029                         log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2030                         return
2031                 }
2032                 defer fi.Close()
2033                 reader = fi
2034         } else {
2035                 filename = "/" + evt_data.Filename
2036                 if mc != nil {
2037                         if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2038                                 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2039                                 return
2040                         }
2041                         tctx := context.Background()
2042                         mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2043                         if err != nil {
2044                                 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2045                                 return
2046                         }
2047                         reader = mr
2048                         defer mr.Close()
2049                 } else {
2050                         log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2051                         return
2052                 }
2053         }
2054
2055         var data *[]byte
2056         if strings.HasSuffix(filename, "gz") {
2057                 start := time.Now()
2058                 var buf2 bytes.Buffer
2059                 errb := gunzipReaderToWriter(&buf2, reader)
2060                 if errb != nil {
2061                         log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2062                         return
2063                 }
2064                 d := buf2.Bytes()
2065                 data = &d
2066                 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2067         } else {
2068
2069                 start := time.Now()
2070                 d, err := io.ReadAll(reader)
2071                 if err != nil {
2072                         log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2073                         return
2074                 }
2075                 data = &d
2076
2077                 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2078         }
2079
2080         for k, v := range filterList {
2081
2082                 var pmfile PMJsonFile
2083                 start := time.Now()
2084                 err = jsoniter.Unmarshal(*data, &pmfile)
2085                 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2086
2087                 if err != nil {
2088                         log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2089                         return
2090                 }
2091
2092                 var kmsg *KafkaPayload = new(KafkaPayload)
2093                 kmsg.msg = new(kafka.Message)
2094                 kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
2095                 log.Debug("topic:", topic_list[k])
2096                 log.Debug("sourceNameMap:", v.sourceNameMap)
2097                 log.Debug("measObjClassMap:", v.measObjClassMap)
2098                 log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
2099                 log.Debug("measTypesMap:", v.measTypesMap)
2100
2101                 b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2102                 if b == nil {
2103                         log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2104                         return
2105                 }
2106                 kmsg.msg.Value = *b
2107
2108                 kmsg.topic = topic_list[k]
2109                 kmsg.jobid = k
2110
2111                 data_out_channel <- kmsg
2112         }
2113
2114 }
2115
2116 func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2117
2118         if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
2119                 return nil
2120         }
2121         start := time.Now()
2122         j, err := jsoniter.Marshal(&data)
2123
2124         log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2125
2126         if err != nil {
2127                 log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2128                 return nil
2129         }
2130
2131         log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2132         return &j
2133 }
2134
2135 func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
2136         filter_req := true
2137         start := time.Now()
2138         if len(sourceNameMap) != 0 {
2139                 if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2140                         filter_req = false
2141                         return nil
2142                 }
2143         }
2144         if filter_req {
2145                 modified := false
2146                 var temp_mil []MeasInfoList
2147                 for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2148
2149                         check_cntr := false
2150                         var cnt_flags []bool
2151                         if len(measTypesMap) > 0 {
2152                                 c_cntr := 0
2153                                 var temp_mtl []string
2154                                 for _, v := range zz.MeasTypes.SMeasTypesList {
2155                                         if measTypesMap[v] {
2156                                                 cnt_flags = append(cnt_flags, true)
2157                                                 c_cntr++
2158                                                 temp_mtl = append(temp_mtl, v)
2159                                         } else {
2160                                                 cnt_flags = append(cnt_flags, false)
2161                                         }
2162                                 }
2163                                 if c_cntr > 0 {
2164                                         check_cntr = true
2165                                         zz.MeasTypes.SMeasTypesList = temp_mtl
2166                                 } else {
2167                                         modified = true
2168                                         continue
2169                                 }
2170                         }
2171                         keep := false
2172                         var temp_mvl []MeasValues
2173                         for _, yy := range zz.MeasValuesList {
2174                                 keep_class := false
2175                                 keep_inst := false
2176                                 keep_cntr := false
2177
2178                                 dna := strings.Split(yy.MeasObjInstID, ",")
2179                                 instName := dna[len(dna)-1]
2180                                 cls := strings.Split(dna[len(dna)-1], "=")[0]
2181
2182                                 if len(measObjClassMap) > 0 {
2183                                         if measObjClassMap[cls] {
2184                                                 keep_class = true
2185                                         }
2186                                 } else {
2187                                         keep_class = true
2188                                 }
2189
2190                                 if len(measObjInstIdsMap) > 0 {
2191                                         if measObjInstIdsMap[instName] {
2192                                                 keep_inst = true
2193                                         }
2194                                 } else {
2195                                         keep_inst = true
2196                                 }
2197
2198                                 if check_cntr {
2199                                         var temp_mrl []MeasResults
2200                                         cnt_p := 1
2201                                         for _, v := range yy.MeasResultsList {
2202                                                 if cnt_flags[v.P-1] {
2203                                                         v.P = cnt_p
2204                                                         cnt_p++
2205                                                         temp_mrl = append(temp_mrl, v)
2206                                                 }
2207                                         }
2208                                         yy.MeasResultsList = temp_mrl
2209                                         keep_cntr = true
2210                                 } else {
2211                                         keep_cntr = true
2212                                 }
2213                                 if keep_class && keep_cntr && keep_inst {
2214                                         keep = true
2215                                         temp_mvl = append(temp_mvl, yy)
2216                                 }
2217                         }
2218                         if keep {
2219                                 zz.MeasValuesList = temp_mvl
2220                                 temp_mil = append(temp_mil, zz)
2221                                 modified = true
2222                         }
2223
2224                 }
2225                 //Only if modified
2226                 if modified {
2227                         if len(temp_mil) == 0 {
2228                                 log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2229                                 return nil
2230                         }
2231                         data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2232                 }
2233         }
2234         log.Debug("Filter: ", time.Since(start).String())
2235         return data
2236 }
2237
2238 func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
2239
2240         log.Info("Type job", type_id, " started")
2241         log.Debug("influx job ch ", data_in_ch)
2242         filters := make(map[string]Filter)
2243         filterParams_list := make(map[string]FilterMaps)
2244         influx_job_params := make(map[string]InfluxJobParameters)
2245         var mc *minio.Client
2246         const mc_id = "mc_" + "start_job_json_file_data_influx"
2247         running := true
2248         for {
2249                 select {
2250                 case job_ctl := <-control_ch:
2251                         log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2252                         switch job_ctl.command {
2253                         case "EXIT":
2254                                 //ignore cmd - handled by channel signal
2255                         case "ADD-FILTER":
2256
2257                                 filters[job_ctl.filter.JobId] = job_ctl.filter
2258                                 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2259                                 log.Debug(job_ctl.filter)
2260                                 tmp_filterParams_list := make(map[string]FilterMaps)
2261                                 for k, v := range filterParams_list {
2262                                         tmp_filterParams_list[k] = v
2263                                 }
2264                                 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2265                                 filterParams_list = tmp_filterParams_list
2266
2267                                 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2268                                 for k, v := range influx_job_params {
2269                                         tmp_influx_job_params[k] = v
2270                                 }
2271                                 tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
2272                                 influx_job_params = tmp_influx_job_params
2273
2274                         case "REMOVE-FILTER":
2275
2276                                 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2277
2278                                 tmp_filterParams_list := make(map[string]FilterMaps)
2279                                 for k, v := range filterParams_list {
2280                                         tmp_filterParams_list[k] = v
2281                                 }
2282                                 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2283                                 filterParams_list = tmp_filterParams_list
2284
2285                                 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2286                                 for k, v := range influx_job_params {
2287                                         tmp_influx_job_params[k] = v
2288                                 }
2289                                 delete(tmp_influx_job_params, job_ctl.filter.JobId)
2290                                 influx_job_params = tmp_influx_job_params
2291                         }
2292
2293                 case msg := <-data_in_ch:
2294                         log.Debug("Data reveived - influx")
2295                         if msg == nil {
2296                                 log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
2297
2298                                 running = false
2299                                 return
2300                         }
2301                         if objectstore {
2302                                 if mc == nil {
2303                                         var err *error
2304                                         mc, err = create_minio_client(mc_id)
2305                                         if err != nil {
2306                                                 log.Debug("Cannot create minio client for type job: ", type_id)
2307                                         }
2308                                 }
2309                         }
2310
2311                         jobLimiterChan <- struct{}{}
2312                         go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
2313
2314                 case <-time.After(1 * time.Second):
2315                         if !running {
2316                                 return
2317                         }
2318                 }
2319         }
2320 }
2321
2322 func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2323
2324         log.Debug("run_json_file_data_job_influx")
2325         //Release job limit
2326         defer func() {
2327                 <-jobLimiterChan
2328         }()
2329
2330         PrintMemUsage()
2331
2332         var evt_data FileDownloadedEvt
2333         err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2334         if err != nil {
2335                 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2336                 return
2337         }
2338         log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2339
2340         var reader io.Reader
2341
2342         INPUTBUCKET := "pm-files-json"
2343         filename := ""
2344         if objectstore == false {
2345                 filename = files_volume + "/" + evt_data.Filename
2346                 fi, err := os.Open(filename)
2347
2348                 if err != nil {
2349                         log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2350                         return
2351                 }
2352                 defer fi.Close()
2353                 reader = fi
2354         } else {
2355                 filename = "/" + evt_data.Filename
2356                 if mc != nil {
2357                         if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2358                                 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2359                                 return
2360                         }
2361                         tctx := context.Background()
2362                         mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2363                         if err != nil {
2364                                 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2365                                 return
2366                         }
2367                         reader = mr
2368                         defer mr.Close()
2369                 } else {
2370                         log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2371                         return
2372                 }
2373         }
2374
2375         var data *[]byte
2376         if strings.HasSuffix(filename, "gz") {
2377                 start := time.Now()
2378                 var buf2 bytes.Buffer
2379                 errb := gunzipReaderToWriter(&buf2, reader)
2380                 if errb != nil {
2381                         log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2382                         return
2383                 }
2384                 d := buf2.Bytes()
2385                 data = &d
2386                 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2387         } else {
2388
2389                 start := time.Now()
2390                 d, err := io.ReadAll(reader)
2391                 if err != nil {
2392                         log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2393                         return
2394                 }
2395                 data = &d
2396
2397                 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2398         }
2399         for k, v := range filterList {
2400
2401                 var pmfile PMJsonFile
2402                 start := time.Now()
2403                 err = jsoniter.Unmarshal(*data, &pmfile)
2404                 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2405
2406                 if err != nil {
2407                         log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2408                         return
2409                 }
2410
2411                 if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2412                         b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2413                         if b == nil {
2414                                 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2415                                 return
2416                         }
2417
2418                 }
2419                 fluxParms := influxList[k]
2420                 log.Debug("Influxdb params: ", fluxParms)
2421                 client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
2422                 writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
2423
2424                 for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2425                         ctr_names := make(map[string]string)
2426                         for cni, cn := range zz.MeasTypes.SMeasTypesList {
2427                                 ctr_names[strconv.Itoa(cni+1)] = cn
2428                         }
2429                         for _, xx := range zz.MeasValuesList {
2430                                 log.Debug("Measurement: ", xx.MeasObjInstID)
2431                                 log.Debug("Suspect flag: ", xx.SuspectFlag)
2432                                 p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
2433                                 p.AddField("suspectflag", xx.SuspectFlag)
2434                                 p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
2435                                 p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
2436                                 for _, yy := range xx.MeasResultsList {
2437                                         pi := strconv.Itoa(yy.P)
2438                                         pv := yy.SValue
2439                                         pn := ctr_names[pi]
2440                                         log.Debug("Counter: ", pn, " Value: ", pv)
2441                                         pv_i, err := strconv.Atoi(pv)
2442                                         if err == nil {
2443                                                 p.AddField(pn, pv_i)
2444                                         } else {
2445                                                 p.AddField(pn, pv)
2446                                         }
2447                                 }
2448                                 //p.SetTime(timeT)
2449                                 log.Debug("StartEpochMicrosec from common event header:  ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2450                                 log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2451                                 p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2452                                 err := writeAPI.WritePoint(context.Background(), p)
2453                                 if err != nil {
2454                                         log.Error("Db write error: ", err)
2455                                 }
2456                         }
2457
2458                 }
2459                 client.Close()
2460         }
2461
2462 }