X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=kafka-pm-producer%2Fmain.go;fp=kafka-pm-producer%2Fmain.go;h=9c3d8a412054e32655d1c0f580177bb71a79e33c;hb=c5655db5780275b07b25b57b767808f2f1eac7d9;hp=0000000000000000000000000000000000000000;hpb=7c434fcb459c84543cdb0ad14aa59391c60d16d4;p=nonrtric%2Fplt%2Franpm.git diff --git a/kafka-pm-producer/main.go b/kafka-pm-producer/main.go new file mode 100644 index 0000000..9c3d8a4 --- /dev/null +++ b/kafka-pm-producer/main.go @@ -0,0 +1,2802 @@ +// ============LICENSE_START=============================================== +// Copyright (C) 2023 Nordix Foundation. All rights reserved. +// ======================================================================== +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============LICENSE_END================================================= +// + +package main + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/tls" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "io" + "net" + "os/signal" + "reflect" + "strings" + "sync" + "syscall" + + "net/http" + "os" + "runtime" + "strconv" + "time" + + "github.com/google/uuid" + "golang.org/x/oauth2/clientcredentials" + + log "github.com/sirupsen/logrus" + + "github.com/gorilla/mux" + + "net/http/pprof" + + "github.com/confluentinc/confluent-kafka-go/kafka" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + jsoniter "github.com/json-iterator/go" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +//== Constants ==// + +const http_port = 80 +const https_port = 443 +const config_file = "application_configuration.json" +const server_crt = "server.crt" +const server_key = "server.key" + +const producer_name = "kafka-producer" + +const registration_delay_short = 2 +const registration_delay_long = 120 + +const mutexLocked = 1 + +const ( + Init AppStates = iota + Running + Terminating +) + +const reader_queue_length = 100 //Per type job +const writer_queue_length = 100 //Per info job +const parallelism_limiter = 100 //For all jobs + +// This are optional - set if using SASL protocol is used towards kafka +var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE") +var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET") +var creds_client_id = os.Getenv("CREDS_CLIENT_ID") +var creds_service_url = os.Getenv("AUTH_SERVICE_URL") + +//== Types ==// + +type AppStates int64 + +type FilterParameters struct { + MeasuredEntityDns []string `json:"measuredEntityDns"` + MeasTypes []string `json:"measTypes"` + MeasObjClass []string `json:"measObjClass"` + MeasObjInstIds []string `json:"measObjInstIds"` +} + +type InfoJobDataType struct { + InfoJobData struct { + KafkaOutputTopic string `json:"kafkaOutputTopic"` + + DbUrl string `json:"db-url"` + DbOrg string `json:"db-org"` + DbBucket string `json:"db-bucket"` + DbToken string `json:"db-token"` + + FilterParams FilterParameters `json:"filter"` + } `json:"info_job_data"` + InfoJobIdentity string `json:"info_job_identity"` + InfoTypeIdentity string `json:"info_type_identity"` + LastUpdated string `json:"last_updated"` + Owner string `json:"owner"` + TargetURI string `json:"target_uri"` +} + +// Type for an infojob +type InfoJobRecord struct { + job_info InfoJobDataType + output_topic string + + statistics *InfoJobStats +} + +// Type for an infojob +type TypeJobRecord struct { + InfoType string + InputTopic string + data_in_channel chan *KafkaPayload + reader_control chan ReaderControl + job_control chan JobControl + groupId string + clientId string + + statistics *TypeJobStats +} + +// Type for controlling the topic reader +type ReaderControl struct { + command string +} + +// Type for controlling the topic writer +type WriterControl struct { + command string +} + +// Type for controlling the job +type JobControl struct { + command string + filter Filter +} + +type KafkaPayload struct { + msg *kafka.Message + topic string + jobid string +} + +type FilterMaps struct { + sourceNameMap map[string]bool + measObjClassMap map[string]bool + measObjInstIdsMap map[string]bool + measTypesMap map[string]bool +} + +type InfluxJobParameters struct { + DbUrl string + DbOrg string + DbBucket string + DbToken string +} + +type Filter struct { + JobId string + OutputTopic string + filter FilterMaps + + influxParameters InfluxJobParameters +} + +// Type for info job statistics +type InfoJobStats struct { + out_msg_cnt int + out_data_vol int64 +} + +// Type for type job statistics +type TypeJobStats struct { + in_msg_cnt int + in_data_vol int64 +} + +// == API Datatypes ==// +// Type for supported data types +type DataType struct { + ID string `json:"id"` + KafkaInputTopic string `json:"kafkaInputTopic"` + InputJobType string `json:inputJobType` + InputJobDefinition struct { + KafkaOutputTopic string `json:kafkaOutputTopic` + } `json:inputJobDefinition` + + ext_job *[]byte + ext_job_created bool + ext_job_id string +} + +type DataTypes struct { + ProdDataTypes []DataType `json:"types"` +} + +type Minio_buckets struct { + Buckets map[string]bool +} + +//== External data types ==// + +// // Data type for event xml file download +type XmlFileEventHeader struct { + ProductName string `json:"productName"` + VendorName string `json:"vendorName"` + Location string `json:"location"` + Compression string `json:"compression"` + SourceName string `json:"sourceName"` + FileFormatType string `json:"fileFormatType"` + FileFormatVersion string `json:"fileFormatVersion"` + StartEpochMicrosec int64 `json:"startEpochMicrosec"` + LastEpochMicrosec int64 `json:"lastEpochMicrosec"` + Name string `json:"name"` + ChangeIdentifier string `json:"changeIdentifier"` + InternalLocation string `json:"internalLocation"` + TimeZoneOffset string `json:"timeZoneOffset"` + //ObjectStoreBucket string `json:"objectStoreBucket"` +} + +// Data types for input xml file +type MeasCollecFile struct { + XMLName xml.Name `xml:"measCollecFile"` + Text string `xml:",chardata"` + Xmlns string `xml:"xmlns,attr"` + Xsi string `xml:"xsi,attr"` + SchemaLocation string `xml:"schemaLocation,attr"` + FileHeader struct { + Text string `xml:",chardata"` + FileFormatVersion string `xml:"fileFormatVersion,attr"` + VendorName string `xml:"vendorName,attr"` + DnPrefix string `xml:"dnPrefix,attr"` + FileSender struct { + Text string `xml:",chardata"` + LocalDn string `xml:"localDn,attr"` + ElementType string `xml:"elementType,attr"` + } `xml:"fileSender"` + MeasCollec struct { + Text string `xml:",chardata"` + BeginTime string `xml:"beginTime,attr"` + } `xml:"measCollec"` + } `xml:"fileHeader"` + MeasData struct { + Text string `xml:",chardata"` + ManagedElement struct { + Text string `xml:",chardata"` + LocalDn string `xml:"localDn,attr"` + SwVersion string `xml:"swVersion,attr"` + } `xml:"managedElement"` + MeasInfo []struct { + Text string `xml:",chardata"` + MeasInfoId string `xml:"measInfoId,attr"` + Job struct { + Text string `xml:",chardata"` + JobId string `xml:"jobId,attr"` + } `xml:"job"` + GranPeriod struct { + Text string `xml:",chardata"` + Duration string `xml:"duration,attr"` + EndTime string `xml:"endTime,attr"` + } `xml:"granPeriod"` + RepPeriod struct { + Text string `xml:",chardata"` + Duration string `xml:"duration,attr"` + } `xml:"repPeriod"` + MeasType []struct { + Text string `xml:",chardata"` + P string `xml:"p,attr"` + } `xml:"measType"` + MeasValue []struct { + Text string `xml:",chardata"` + MeasObjLdn string `xml:"measObjLdn,attr"` + R []struct { + Text string `xml:",chardata"` + P string `xml:"p,attr"` + } `xml:"r"` + Suspect string `xml:"suspect"` + } `xml:"measValue"` + } `xml:"measInfo"` + } `xml:"measData"` + FileFooter struct { + Text string `xml:",chardata"` + MeasCollec struct { + Text string `xml:",chardata"` + EndTime string `xml:"endTime,attr"` + } `xml:"measCollec"` + } `xml:"fileFooter"` +} + +// Data type for json file +// Splitted in sevreal part to allow add/remove in lists +type MeasResults struct { + P int `json:"p"` + SValue string `json:"sValue"` +} + +type MeasValues struct { + MeasObjInstID string `json:"measObjInstId"` + SuspectFlag string `json:"suspectFlag"` + MeasResultsList []MeasResults `json:"measResults"` +} + +type SMeasTypes struct { + SMeasType string `json:"sMeasTypesList"` +} + +type MeasInfoList struct { + MeasInfoID struct { + SMeasInfoID string `json:"sMeasInfoId"` + } `json:"measInfoId"` + MeasTypes struct { + SMeasTypesList []string `json:"sMeasTypesList"` + } `json:"measTypes"` + MeasValuesList []MeasValues `json:"measValuesList"` +} + +type PMJsonFile struct { + Event struct { + CommonEventHeader struct { + Domain string `json:"domain"` + EventID string `json:"eventId"` + Sequence int `json:"sequence"` + EventName string `json:"eventName"` + SourceName string `json:"sourceName"` + ReportingEntityName string `json:"reportingEntityName"` + Priority string `json:"priority"` + StartEpochMicrosec int64 `json:"startEpochMicrosec"` + LastEpochMicrosec int64 `json:"lastEpochMicrosec"` + Version string `json:"version"` + VesEventListenerVersion string `json:"vesEventListenerVersion"` + TimeZoneOffset string `json:"timeZoneOffset"` + } `json:"commonEventHeader"` + Perf3GppFields struct { + Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"` + MeasDataCollection struct { + GranularityPeriod int `json:"granularityPeriod"` + MeasuredEntityUserName string `json:"measuredEntityUserName"` + MeasuredEntityDn string `json:"measuredEntityDn"` + MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"` + SMeasInfoList []MeasInfoList `json:"measInfoList"` + } `json:"measDataCollection"` + } `json:"perf3gppFields"` + } `json:"event"` +} + +// Data type for converted json file message +type FileDownloadedEvt struct { + Filename string `json:"filename"` +} + +//== Variables ==// + +var AppState = Init + +// Lock for all internal data +var datalock sync.Mutex + +var producer_instance_name string = producer_name + +// Keep all info type jobs, key == type id +var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord) + +// Keep all info jobs, key == job id +var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord) + +var InfoTypes DataTypes + +// Limiter - valid for all jobs +var jobLimiterChan = make(chan struct{}, parallelism_limiter) + +// TODO: Config param? +var bucket_location = "swe" + +var httpclient = &http.Client{} + +// == Env variables ==// +var bootstrapserver = os.Getenv("KAFKA_SERVER") +var files_volume = os.Getenv("FILES_VOLUME") +var ics_server = os.Getenv("ICS") +var self = os.Getenv("SELF") +var filestore_user = os.Getenv("FILESTORE_USER") +var filestore_pwd = os.Getenv("FILESTORE_PWD") +var filestore_server = os.Getenv("FILESTORE_SERVER") + +var data_out_channel = make(chan *KafkaPayload, writer_queue_length) +var writer_control = make(chan WriterControl, 1) + +var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets) + +// == Main ==// +func main() { + + //log.SetLevel(log.InfoLevel) + log.SetLevel(log.TraceLevel) + + log.Info("Server starting...") + + if self == "" { + log.Panic("Env SELF not configured") + } + if bootstrapserver == "" { + log.Panic("Env KAFKA_SERVER not set") + } + if ics_server == "" { + log.Panic("Env ICS not set") + } + if os.Getenv("KP") != "" { + producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") + } + + rtr := mux.NewRouter() + rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job) + rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job) + rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer) + rtr.HandleFunc("/statistics", statistics) + rtr.HandleFunc("/logging/{level}", logging_level) + rtr.HandleFunc("/logging", logging_level) + rtr.HandleFunc("/", alive) + + //For perf/mem profiling + rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile) + + http.Handle("/", rtr) + + http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil} + + cer, err := tls.LoadX509KeyPair(server_crt, server_key) + if err != nil { + log.Error("Cannot load key and cert - %v\n", err) + return + } + config := &tls.Config{Certificates: []tls.Certificate{cer}} + https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil} + + //TODO: Make http on/off configurable + // Run http + go func() { + log.Info("Starting http service...") + err := http_server.ListenAndServe() + if err == http.ErrServerClosed { // graceful shutdown + log.Info("http server shutdown...") + } else if err != nil { + log.Error("http server error: %v\n", err) + } + }() + + //TODO: Make https on/off configurable + // Run https + go func() { + log.Info("Starting https service...") + err := https_server.ListenAndServe() + if err == http.ErrServerClosed { // graceful shutdown + log.Info("https server shutdown...") + } else if err != nil { + log.Error("https server error: %v\n", err) + } + }() + check_tcp(strconv.Itoa(http_port)) + check_tcp(strconv.Itoa(https_port)) + + go start_topic_writer(writer_control, data_out_channel) + + //Setup proc for periodic type registration + var event_chan = make(chan int) //Channel for stopping the proc + go periodic_registration(event_chan) + + //Wait for term/int signal do try to shut down gracefully + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + fmt.Printf("Received signal %s - application will terminate\n", sig) + event_chan <- 0 // Stop periodic registration + datalock.Lock() + defer datalock.Unlock() + AppState = Terminating + http_server.Shutdown(context.Background()) + https_server.Shutdown(context.Background()) + // Stopping jobs + for key, _ := range TypeJobs { + log.Info("Stopping type job:", key) + for _, dp := range InfoTypes.ProdDataTypes { + if key == dp.ID { + remove_type_job(dp) + } + } + } + }() + + AppState = Running + + //Wait until all go routines has exited + runtime.Goexit() + + fmt.Println("main routine exit") + fmt.Println("server stopped") +} + +func check_tcp(port string) { + log.Info("Checking tcp port: ", port) + for true { + address := net.JoinHostPort("localhost", port) + // 3 second timeout + conn, err := net.DialTimeout("tcp", address, 3*time.Second) + if err != nil { + log.Info("Checking tcp port: ", port, " failed, retrying...") + } else { + if conn != nil { + log.Info("Checking tcp port: ", port, " - OK") + _ = conn.Close() + return + } else { + log.Info("Checking tcp port: ", port, " failed, retrying...") + } + } + } +} + +//== Core functions ==// + +// Run periodic registration of producers +func periodic_registration(evtch chan int) { + var delay int = 1 + for { + select { + case msg := <-evtch: + if msg == 0 { // Stop thread + return + } + case <-time.After(time.Duration(delay) * time.Second): + ok := register_producer() + if ok { + delay = registration_delay_long + } else { + if delay < registration_delay_long { + delay += registration_delay_short + } else { + delay = registration_delay_short + } + } + } + } +} + +func register_producer() bool { + + log.Info("Registering producer: ", producer_instance_name) + + file, err := os.ReadFile(config_file) + if err != nil { + log.Error("Cannot read config file: ", config_file) + log.Error("Registering producer: ", producer_instance_name, " - failed") + return false + } + data := DataTypes{} + err = jsoniter.Unmarshal([]byte(file), &data) + if err != nil { + log.Error("Cannot parse config file: ", config_file) + log.Error("Registering producer: ", producer_instance_name, " - failed") + return false + } + var new_type_names []string + + for i := 0; i < len(data.ProdDataTypes); i++ { + t1 := make(map[string]interface{}) + t2 := make(map[string]interface{}) + + t2["schema"] = "http://json-schema.org/draft-07/schema#" + t2["title"] = data.ProdDataTypes[i].ID + t2["description"] = data.ProdDataTypes[i].ID + t2["type"] = "object" + + t1["info_job_data_schema"] = t2 + + json, err := json.Marshal(t1) + if err != nil { + log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) + log.Error("Registering producer: ", producer_instance_name, " - failed") + return false + } else { + //TODO: http/https should be configurable + ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") + if !ok { + log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) + log.Error("Registering producer: ", producer_instance_name, " - failed") + return false + } + new_type_names = append(new_type_names, data.ProdDataTypes[i].ID) + } + + } + + log.Debug("Registering types: ", new_type_names) + m := make(map[string]interface{}) + m["supported_info_types"] = new_type_names + //TODO: http/https should be configurable + m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name + //TODO: http/https should be configurable + m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name + + json, err := json.Marshal(m) + if err != nil { + log.Error("Cannot create json for producer: ", producer_instance_name) + log.Error("Registering producer: ", producer_instance_name, " - failed") + return false + } + ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "") + if !ok { + log.Error("Cannot register producer: ", producer_instance_name) + log.Error("Registering producer: ", producer_instance_name, " - failed") + return false + } + datalock.Lock() + defer datalock.Unlock() + + var current_type_names []string + for _, v := range InfoTypes.ProdDataTypes { + current_type_names = append(current_type_names, v.ID) + if contains_str(new_type_names, v.ID) { + //Type exist + log.Debug("Type ", v.ID, " exists") + create_ext_job(v) + } else { + //Type is removed + log.Info("Removing type job for type: ", v.ID, " Type not in configuration") + remove_type_job(v) + } + } + + for _, v := range data.ProdDataTypes { + if contains_str(current_type_names, v.ID) { + //Type exist + log.Debug("Type ", v.ID, " exists") + create_ext_job(v) + } else { + //Type is new + log.Info("Adding type job for type: ", v.ID, " Type added to configuration") + start_type_job(v) + } + } + + InfoTypes = data + log.Debug("Datatypes: ", InfoTypes) + + log.Info("Registering producer: ", producer_instance_name, " - OK") + return true +} + +func remove_type_job(dp DataType) { + log.Info("Removing type job: ", dp.ID) + j, ok := TypeJobs[dp.ID] + if ok { + j.reader_control <- ReaderControl{"EXIT"} + } + + if dp.ext_job_created == true { + dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) + //TODO: http/https should be configurable + ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") + if !ok { + log.Error("Cannot delete job: ", dp.ext_job_id) + } + dp.ext_job_created = false + dp.ext_job = nil + } + +} + +func start_type_job(dp DataType) { + log.Info("Starting type job: ", dp.ID) + job_record := TypeJobRecord{} + + job_record.job_control = make(chan JobControl, 1) + job_record.reader_control = make(chan ReaderControl, 1) + job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length) + job_record.InfoType = dp.ID + job_record.InputTopic = dp.KafkaInputTopic + job_record.groupId = "kafka-procon-" + dp.ID + job_record.clientId = dp.ID + "-" + os.Getenv("KP") + var stats TypeJobStats + job_record.statistics = &stats + + switch dp.ID { + case "xml-file-data-to-filestore": + go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json") + case "xml-file-data": + go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "") + case "json-file-data-from-filestore": + go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true) + case "json-file-data": + go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false) + case "json-file-data-from-filestore-to-influx": + go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true) + // case "json-data-to-influx": + // go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel) + + default: + } + + go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats) + + TypeJobs[dp.ID] = job_record + log.Debug("Type job input type: ", dp.InputJobType) + create_ext_job(dp) +} + +func create_ext_job(dp DataType) { + if dp.InputJobType != "" { + jb := make(map[string]interface{}) + jb["info_type_id"] = dp.InputJobType + jb["job_owner"] = "console" //TODO: + jb["status_notification_uri"] = "http://callback:80/post" + jb1 := make(map[string]interface{}) + jb["job_definition"] = jb1 + jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic + + json, err := json.Marshal(jb) + dp.ext_job_created = false + dp.ext_job = nil + if err != nil { + log.Error("Cannot create json for type: ", dp.InputJobType) + return + } + + dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) + //TODO: http/https should be configurable + ok := false + for !ok { + ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") + if !ok { + log.Error("Cannot register job: ", dp.InputJobType) + //TODO: Restart after long time? + } + } + log.Debug("Registered job ok: ", dp.InputJobType) + dp.ext_job_created = true + dp.ext_job = &json + } +} + +func remove_info_job(jobid string) { + log.Info("Removing info job: ", jobid) + filter := Filter{} + filter.JobId = jobid + jc := JobControl{} + jc.command = "REMOVE-FILTER" + jc.filter = filter + infoJob := InfoJobs[jobid] + typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity] + typeJob.job_control <- jc + delete(InfoJobs, jobid) + +} + +// == Helper functions ==// + +// Function to check the status of a mutex lock +func MutexLocked(m *sync.Mutex) bool { + state := reflect.ValueOf(m).Elem().FieldByName("state") + return state.Int()&mutexLocked == mutexLocked +} + +// Test if slice contains a string +func contains_str(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} + +// Send a http request with json (json may be nil) +func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { + + // set the HTTP method, url, and request body + var req *http.Request + var err error + if json == nil { + req, err = http.NewRequest(method, url, http.NoBody) + } else { + req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) + req.Header.Set("Content-Type", "application/json; charset=utf-8") + } + if err != nil { + log.Error("Cannot create http request, method: ", method, " url: ", url) + return false + } + + if useAuth { + token, err := fetch_token() + if err != nil { + log.Error("Cannot fetch token for http request: ", err) + return false + } + req.Header.Set("Authorization", "Bearer "+token.TokenValue) + } + + log.Debug("HTTP request: ", req) + + log.Debug("Sending http request") + resp, err2 := httpclient.Do(req) + if err2 != nil { + log.Error("Http request error: ", err2) + log.Error("Cannot send http request method: ", method, " url: ", url) + } else { + if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { + log.Debug("Accepted http status: ", resp.StatusCode) + resp.Body.Close() + return true + } + log.Debug("HTTP resp: ", resp) + resp.Body.Close() + } + return false +} + +// // Send a http request with json (json may be nil) +// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { + +// // set the HTTP method, url, and request body +// var req *http.Request +// var err error +// if json == nil { +// req, err = http.NewRequest(method, url, http.NoBody) +// } else { +// req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) +// req.Header.Set("Content-Type", "application/json; charset=utf-8") +// } +// if err != nil { +// log.Error("Cannot create http request, method: ", method, " url: ", url) +// return false +// } + +// if useAuth { +// token, err := fetch_token() +// if err != nil { +// log.Error("Cannot fetch token for http request: ", err) +// return false +// } +// req.Header.Set("Authorization", "Bearer "+token.TokenValue) +// } + +// log.Debug("HTTP request: ", req) + +// retries := 1 +// if retry { +// retries = 5 +// } +// sleep_time := 1 +// for i := retries; i > 0; i-- { +// log.Debug("Sending http request") +// resp, err2 := httpclient.Do(req) +// if err2 != nil { +// log.Error("Http request error: ", err2) +// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1) + +// time.Sleep(time.Duration(sleep_time) * time.Second) +// sleep_time = 2 * sleep_time +// } else { +// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { +// log.Debug("Accepted http status: ", resp.StatusCode) +// resp.Body.Close() +// return true +// } +// log.Debug("HTTP resp: ", resp) +// resp.Body.Close() +// } +// } +// return false +// } + +// // Send a http request with json (json may be nil) +// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { +// // initialize http client +// client := &http.Client{} + +// // set the HTTP method, url, and request body +// var req *http.Request +// var err error +// if json == nil { +// req, err = http.NewRequest(method, url, http.NoBody) +// } else { +// req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) +// req.Header.Set("Content-Type", "application/json; charset=utf-8") +// } +// if err != nil { +// log.Error("Cannot create http request method: ", method, " url: ", url) +// return false +// } + +// useAuth = false +// if useAuth { +// token, err := fetch_token() +// if err != nil { +// log.Error("Cannot fetch token for http request: ", err) +// return false +// } +// req.Header.Add("Authorization", "Bearer "+token.TokenValue) +// } +// log.Debug("HTTP request: ", req) + +// b, berr := io.ReadAll(req.Body) +// if berr == nil { +// log.Debug("HTTP request body length: ", len(b)) +// } else { +// log.Debug("HTTP request - cannot check body length: ", berr) +// } +// if json == nil { +// log.Debug("HTTP request null json") +// } else { +// log.Debug("HTTP request json: ", string(json)) +// } +// requestDump, cerr := httputil.DumpRequestOut(req, true) +// if cerr != nil { +// fmt.Println(cerr) +// } +// fmt.Println(string(requestDump)) + +// retries := 1 +// if retry { +// retries = 5 +// } +// sleep_time := 1 +// for i := retries; i > 0; i-- { +// resp, err2 := client.Do(req) +// if err2 != nil { +// log.Error("Http request error: ", err2) +// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i) + +// time.Sleep(time.Duration(sleep_time) * time.Second) +// sleep_time = 2 * sleep_time +// } else { +// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { +// log.Debug("Accepted http status: ", resp.StatusCode) +// defer resp.Body.Close() +// return true +// } +// } +// } +// return false +// } + +func fetch_token() (*kafka.OAuthBearerToken, error) { + log.Debug("Get token inline") + conf := &clientcredentials.Config{ + ClientID: creds_client_id, + ClientSecret: creds_client_secret, + TokenURL: creds_service_url, + } + token, err := conf.Token(context.Background()) + if err != nil { + log.Warning("Cannot fetch access token: ", err) + return nil, err + } + extensions := map[string]string{} + log.Debug("=====================================================") + log.Debug("token: ", token) + log.Debug("=====================================================") + log.Debug("TokenValue: ", token.AccessToken) + log.Debug("=====================================================") + log.Debug("Expiration: ", token.Expiry) + t := token.Expiry + // t := token.Expiry.Add(-time.Minute) + // log.Debug("Modified expiration: ", t) + oauthBearerToken := kafka.OAuthBearerToken{ + TokenValue: token.AccessToken, + Expiration: t, + Extensions: extensions, + } + + return &oauthBearerToken, nil +} + +// Function to print memory details +// https://pkg.go.dev/runtime#MemStats +func PrintMemUsage() { + if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v\n", m.NumGC) + fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys)) + fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys)) + fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys)) + fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys)) + fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys)) + fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys)) + fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys)) + } +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + +func generate_uuid_from_type(s string) string { + if len(s) > 16 { + s = s[:16] + } + for len(s) < 16 { + s = s + "0" + } + b := []byte(s) + b = b[:16] + uuid, _ := uuid.FromBytes(b) + return uuid.String() +} + +// Write gzipped data to a Writer +func gzipWrite(w io.Writer, data *[]byte) error { + gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed) + + if err1 != nil { + return err1 + } + defer gw.Close() + _, err2 := gw.Write(*data) + return err2 +} + +// Write gunzipped data from Reader to a Writer +func gunzipReaderToWriter(w io.Writer, data io.Reader) error { + gr, err1 := gzip.NewReader(data) + + if err1 != nil { + return err1 + } + defer gr.Close() + data2, err2 := io.ReadAll(gr) + if err2 != nil { + return err2 + } + _, err3 := w.Write(data2) + if err3 != nil { + return err3 + } + return nil +} + +func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error { + tctx := context.Background() + err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location}) + if err != nil { + // Check to see if we already own this bucket (which happens if you run this twice) + exists, errBucketExists := mc.BucketExists(tctx, bucket) + if errBucketExists == nil && exists { + log.Debug("Already own bucket:", bucket) + add_bucket(client_id, bucket) + return nil + } else { + log.Error("Cannot create or check bucket ", bucket, " in minio client", err) + return err + } + } + log.Debug("Successfully created bucket: ", bucket) + add_bucket(client_id, bucket) + return nil +} + +func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool { + ok := bucket_exist(client_id, bucket) + if ok { + return true + } + tctx := context.Background() + exists, err := mc.BucketExists(tctx, bucket) + if err == nil && exists { + log.Debug("Already own bucket:", bucket) + return true + } + log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err) + return false +} + +func add_bucket(minio_id string, bucket string) { + datalock.Lock() + defer datalock.Unlock() + + b, ok := minio_bucketlist[minio_id] + if !ok { + b = Minio_buckets{} + b.Buckets = make(map[string]bool) + } + b.Buckets[bucket] = true + minio_bucketlist[minio_id] = b +} + +func bucket_exist(minio_id string, bucket string) bool { + datalock.Lock() + defer datalock.Unlock() + + b, ok := minio_bucketlist[minio_id] + if !ok { + return false + } + _, ok = b.Buckets[bucket] + return ok +} + +//== http api functions ==// + +// create/update job +func create_job(w http.ResponseWriter, req *http.Request) { + log.Debug("Create job, http method: ", req.Method) + if req.Method != http.MethodPost { + log.Error("Create job, http method not allowed") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + ct := req.Header.Get("Content-Type") + if ct != "application/json" { + log.Error("Create job, bad content type") + http.Error(w, "Bad content type", http.StatusBadRequest) + return + } + + var t InfoJobDataType + err := json.NewDecoder(req.Body).Decode(&t) + if err != nil { + log.Error("Create job, cannot parse json,", err) + http.Error(w, "Cannot parse json", http.StatusBadRequest) + return + } + log.Debug("Creating job, id: ", t.InfoJobIdentity) + datalock.Lock() + defer datalock.Unlock() + + job_id := t.InfoJobIdentity + job_record, job_found := InfoJobs[job_id] + type_job_record, found_type := TypeJobs[t.InfoTypeIdentity] + if !job_found { + if !found_type { + log.Error("Type ", t.InfoTypeIdentity, " does not exist") + http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest) + return + } + } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity { + log.Error("Job cannot change type") + http.Error(w, "Job cannot change type", http.StatusBadRequest) + return + } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic { + log.Error("Job cannot change topic") + http.Error(w, "Job cannot change topic", http.StatusBadRequest) + return + } else if !found_type { + //Should never happen, if the type is removed then job is stopped + log.Error("Type ", t.InfoTypeIdentity, " does not exist") + http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest) + return + } + + //TODO: Verify that job contains enough parameters... + + if !job_found { + job_record = InfoJobRecord{} + job_record.job_info = t + output_topic := t.InfoJobData.KafkaOutputTopic + job_record.output_topic = t.InfoJobData.KafkaOutputTopic + log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic) + + var stats InfoJobStats + job_record.statistics = &stats + + filter := Filter{} + filter.JobId = job_id + filter.OutputTopic = job_record.output_topic + + jc := JobControl{} + + jc.command = "ADD-FILTER" + + //TODO: Refactor + if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { + fm := FilterMaps{} + fm.sourceNameMap = make(map[string]bool) + fm.measObjClassMap = make(map[string]bool) + fm.measObjInstIdsMap = make(map[string]bool) + fm.measTypesMap = make(map[string]bool) + if t.InfoJobData.FilterParams.MeasuredEntityDns != nil { + for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns { + fm.sourceNameMap[v] = true + } + } + if t.InfoJobData.FilterParams.MeasObjClass != nil { + for _, v := range t.InfoJobData.FilterParams.MeasObjClass { + fm.measObjClassMap[v] = true + } + } + if t.InfoJobData.FilterParams.MeasObjInstIds != nil { + for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds { + fm.measObjInstIdsMap[v] = true + } + } + if t.InfoJobData.FilterParams.MeasTypes != nil { + for _, v := range t.InfoJobData.FilterParams.MeasTypes { + fm.measTypesMap[v] = true + } + } + filter.filter = fm + } + if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { + influxparam := InfluxJobParameters{} + influxparam.DbUrl = t.InfoJobData.DbUrl + influxparam.DbOrg = t.InfoJobData.DbOrg + influxparam.DbBucket = t.InfoJobData.DbBucket + influxparam.DbToken = t.InfoJobData.DbToken + filter.influxParameters = influxparam + } + + jc.filter = filter + InfoJobs[job_id] = job_record + + type_job_record.job_control <- jc + + } else { + //TODO + //Update job + } +} + +// delete job +func delete_job(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodDelete { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + datalock.Lock() + defer datalock.Unlock() + + vars := mux.Vars(req) + + if id, ok := vars["job_id"]; ok { + if _, ok := InfoJobs[id]; ok { + remove_info_job(id) + w.WriteHeader(http.StatusNoContent) + log.Info("Job ", id, " deleted") + return + } + } + w.WriteHeader(http.StatusNotFound) +} + +// job supervision +func supervise_job(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + datalock.Lock() + defer datalock.Unlock() + + vars := mux.Vars(req) + + log.Debug("Supervising, job: ", vars["job_id"]) + if id, ok := vars["job_id"]; ok { + if _, ok := InfoJobs[id]; ok { + log.Debug("Supervision ok, job", id) + return + } + } + w.WriteHeader(http.StatusNotFound) +} + +// producer supervision +func supervise_producer(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + w.WriteHeader(http.StatusOK) +} + +// producer statictics, all jobs +func statistics(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + m := make(map[string]interface{}) + log.Debug("producer statictics, locked? ", MutexLocked(&datalock)) + datalock.Lock() + defer datalock.Unlock() + req.Header.Set("Content-Type", "application/json; charset=utf-8") + m["number-of-jobs"] = len(InfoJobs) + m["number-of-types"] = len(InfoTypes.ProdDataTypes) + qm := make(map[string]interface{}) + m["jobs"] = qm + for key, elem := range InfoJobs { + jm := make(map[string]interface{}) + qm[key] = jm + jm["type"] = elem.job_info.InfoTypeIdentity + typeJob := TypeJobs[elem.job_info.InfoTypeIdentity] + jm["groupId"] = typeJob.groupId + jm["clientID"] = typeJob.clientId + jm["input topic"] = typeJob.InputTopic + jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel) + jm["output topic"] = elem.output_topic + jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel) + jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt + jm["msg_out (job)"] = elem.statistics.out_msg_cnt + + } + json, err := json.Marshal(m) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Error("Cannot marshal statistics json") + return + } + _, err = w.Write(json) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Error("Cannot send statistics json") + return + } +} + +// Simple alive check +func alive(w http.ResponseWriter, req *http.Request) { + //Alive check +} + +// Get/Set logging level +func logging_level(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + if level, ok := vars["level"]; ok { + if req.Method == http.MethodPut { + switch level { + case "trace": + log.SetLevel(log.TraceLevel) + case "debug": + log.SetLevel(log.DebugLevel) + case "info": + log.SetLevel(log.InfoLevel) + case "warn": + log.SetLevel(log.WarnLevel) + case "error": + log.SetLevel(log.ErrorLevel) + case "fatal": + log.SetLevel(log.FatalLevel) + case "panic": + log.SetLevel(log.PanicLevel) + default: + w.WriteHeader(http.StatusNotFound) + } + } else { + w.WriteHeader(http.StatusMethodNotAllowed) + } + } else { + if req.Method == http.MethodGet { + msg := "none" + if log.IsLevelEnabled(log.PanicLevel) { + msg = "panic" + } else if log.IsLevelEnabled(log.FatalLevel) { + msg = "fatal" + } else if log.IsLevelEnabled(log.ErrorLevel) { + msg = "error" + } else if log.IsLevelEnabled(log.WarnLevel) { + msg = "warn" + } else if log.IsLevelEnabled(log.InfoLevel) { + msg = "info" + } else if log.IsLevelEnabled(log.DebugLevel) { + msg = "debug" + } else if log.IsLevelEnabled(log.TraceLevel) { + msg = "trace" + } + w.Header().Set("Content-Type", "application/text") + w.Write([]byte(msg)) + } else { + w.WriteHeader(http.StatusMethodNotAllowed) + } + } +} + +func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) { + + log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id) + + topic_ok := false + var c *kafka.Consumer = nil + running := true + + for topic_ok == false { + + select { + case reader_ctrl := <-control_ch: + if reader_ctrl.command == "EXIT" { + log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") + //TODO: Stop consumer if present? + data_ch <- nil //Signal to job handler + running = false + return + } + case <-time.After(1 * time.Second): + if !running { + return + } + if c == nil { + c = create_kafka_consumer(type_id, gid, cid) + if c == nil { + log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying") + } else { + log.Info("Consumer started on topic: ", topic, " for type: ", type_id) + } + } + if c != nil && topic_ok == false { + err := c.SubscribeTopics([]string{topic}, nil) + if err != nil { + log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err) + } else { + log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id) + topic_ok = true + } + } + } + } + log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id) + + var event_chan = make(chan int) + go func() { + for { + select { + case evt := <-c.Events(): + switch evt.(type) { + case kafka.OAuthBearerTokenRefresh: + log.Debug("New consumer token needed: ", evt) + token, err := fetch_token() + if err != nil { + log.Warning("Cannot cannot fetch token: ", err) + c.SetOAuthBearerTokenFailure(err.Error()) + } else { + setTokenError := c.SetOAuthBearerToken(*token) + if setTokenError != nil { + log.Warning("Cannot cannot set token: ", setTokenError) + c.SetOAuthBearerTokenFailure(setTokenError.Error()) + } + } + default: + //TODO: Handle these? + log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String()) + } + + case msg := <-event_chan: + if msg == 0 { + return + } + case <-time.After(1 * time.Second): + if !running { + return + } + } + } + }() + + go func() { + for { + //maxDur := 1 * time.Second + for { + select { + case reader_ctrl := <-control_ch: + if reader_ctrl.command == "EXIT" { + event_chan <- 0 + log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") + data_ch <- nil //Signal to job handler + defer c.Close() + return + } + default: + + ev := c.Poll(1000) + if ev == nil { + log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic) + continue + } + switch e := ev.(type) { + case *kafka.Message: + var kmsg KafkaPayload + kmsg.msg = e + + c.Commit() //TODO: Ok? + + //TODO: Check for exception + data_ch <- &kmsg + stats.in_msg_cnt++ + log.Debug("Reader msg: ", &kmsg) + log.Debug("Reader - data_ch ", data_ch) + case kafka.Error: + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + + case kafka.OAuthBearerTokenRefresh: + log.Debug("New consumer token needed: ", ev) + token, err := fetch_token() + if err != nil { + log.Warning("Cannot cannot fetch token: ", err) + c.SetOAuthBearerTokenFailure(err.Error()) + } else { + setTokenError := c.SetOAuthBearerToken(*token) + if setTokenError != nil { + log.Warning("Cannot cannot set token: ", setTokenError) + c.SetOAuthBearerTokenFailure(setTokenError.Error()) + } + } + default: + fmt.Printf("Ignored %v\n", e) + } + + // orig code + // msg, err := c.ReadMessage(maxDur) + // if err == nil { + // var kmsg KafkaPayload + // kmsg.msg = msg + + // c.Commit() //TODO: Ok? + + // //TODO: Check for exception + // data_ch <- &kmsg + // stats.in_msg_cnt++ + // log.Debug("Reader msg: ", &kmsg) + // log.Debug("Reader - data_ch ", data_ch) + // } else { + // log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic, ", reason: ", err) + // } + + } + } + } + }() +} + +func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) { + + var kafka_producer *kafka.Producer + + running := true + log.Info("Topic writer starting") + + // Wait for kafka producer to become available - and be prepared to exit the writer + for kafka_producer == nil { + select { + case writer_ctl := <-control_ch: + if writer_ctl.command == "EXIT" { + //ignore cmd + } + default: + kafka_producer = start_producer() + if kafka_producer == nil { + log.Debug("Could not start kafka producer - retrying") + time.Sleep(1 * time.Second) + } else { + log.Debug("Kafka producer started") + //defer kafka_producer.Close() + } + } + } + + var event_chan = make(chan int) + go func() { + for { + select { + case evt := <-kafka_producer.Events(): + //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend? + switch evt.(type) { + case *kafka.Message: + m := evt.(*kafka.Message) + + if m.TopicPartition.Error != nil { + log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error) + } else { + log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition) + } + case kafka.Error: + log.Debug("Dumping topic writer event, error: ", evt) + case kafka.OAuthBearerTokenRefresh: + log.Debug("New producer token needed: ", evt) + token, err := fetch_token() + if err != nil { + log.Warning("Cannot cannot fetch token: ", err) + kafka_producer.SetOAuthBearerTokenFailure(err.Error()) + } else { + setTokenError := kafka_producer.SetOAuthBearerToken(*token) + if setTokenError != nil { + log.Warning("Cannot cannot set token: ", setTokenError) + kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error()) + } + } + default: + log.Debug("Dumping topic writer event, unknown: ", evt) + } + + case msg := <-event_chan: + if msg == 0 { + return + } + case <-time.After(1 * time.Second): + if !running { + return + } + } + } + }() + go func() { + for { + select { + case writer_ctl := <-control_ch: + if writer_ctl.command == "EXIT" { + // ignore - wait for channel signal + } + + case kmsg := <-data_ch: + if kmsg == nil { + event_chan <- 0 + // TODO: Close producer? + log.Info("Topic writer stopped by channel signal - start_topic_writer") + defer kafka_producer.Close() + return + } + + retries := 10 + msg_ok := false + var err error + for retry := 1; retry <= retries && msg_ok == false; retry++ { + err = kafka_producer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny}, + Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil) + + if err == nil { + incr_out_msg_cnt(kmsg.jobid) + msg_ok = true + log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic) + } else { + log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err) + time.Sleep(time.Duration(retry) * time.Second) + } + } + if !msg_ok { + //TODO: Retry sending msg? + log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err) + } + case <-time.After(1000 * time.Millisecond): + if !running { + return + } + } + } + }() +} + +func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer { + var cm kafka.ConfigMap + if creds_grant_type == "" { + log.Info("Creating kafka SASL plain text consumer for type: ", type_id) + cm = kafka.ConfigMap{ + "bootstrap.servers": bootstrapserver, + "group.id": gid, + "client.id": cid, + "auto.offset.reset": "latest", + "enable.auto.commit": false, + //"auto.commit.interval.ms": 5000, + } + } else { + log.Info("Creating kafka SASL plain text consumer for type: ", type_id) + cm = kafka.ConfigMap{ + "bootstrap.servers": bootstrapserver, + "group.id": gid, + "client.id": cid, + "auto.offset.reset": "latest", + "enable.auto.commit": false, + "sasl.mechanism": "OAUTHBEARER", + "security.protocol": "SASL_PLAINTEXT", + } + } + c, err := kafka.NewConsumer(&cm) + + //TODO: How to handle autocommit or commit message by message + //TODO: Make arg to kafka configurable + + if err != nil { + log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err) + return nil + } + + //c.Commit() + log.Info("Created kafka consumer for type: ", type_id, " OK") + return c +} + +// Start kafka producer +func start_producer() *kafka.Producer { + log.Info("Creating kafka producer") + + var cm kafka.ConfigMap + if creds_grant_type == "" { + log.Info("Creating kafka SASL plain text producer") + cm = kafka.ConfigMap{ + "bootstrap.servers": bootstrapserver, + } + } else { + log.Info("Creating kafka SASL plain text producer") + cm = kafka.ConfigMap{ + "bootstrap.servers": bootstrapserver, + "sasl.mechanism": "OAUTHBEARER", + "security.protocol": "SASL_PLAINTEXT", + } + } + + p, err := kafka.NewProducer(&cm) + if err != nil { + log.Error("Cannot create kafka producer,", err) + return nil + } + return p +} + +func start_adminclient() *kafka.AdminClient { + log.Info("Creating kafka admin client") + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver}) + if err != nil { + log.Error("Cannot create kafka admin client,", err) + return nil + } + return a +} + +func create_minio_client(id string) (*minio.Client, *error) { + log.Debug("Get minio client") + minio_client, err := minio.New(filestore_server, &minio.Options{ + Secure: false, + Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""), + }) + if err != nil { + log.Error("Cannot create minio client, ", err) + return nil, &err + } + return minio_client, nil +} + +func incr_out_msg_cnt(jobid string) { + j, ok := InfoJobs[jobid] + if ok { + j.statistics.out_msg_cnt++ + } +} + +func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) { + + log.Info("Type job", type_id, " started") + + filters := make(map[string]Filter) + topic_list := make(map[string]string) + var mc *minio.Client + const mc_id = "mc_" + "start_job_xml_file_data" + running := true + for { + select { + case job_ctl := <-control_ch: + log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) + switch job_ctl.command { + case "EXIT": + //ignore cmd - handled by channel signal + case "ADD-FILTER": + filters[job_ctl.filter.JobId] = job_ctl.filter + log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) + + tmp_topic_list := make(map[string]string) + for k, v := range topic_list { + tmp_topic_list[k] = v + } + tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic + topic_list = tmp_topic_list + case "REMOVE-FILTER": + log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) + + tmp_topic_list := make(map[string]string) + for k, v := range topic_list { + tmp_topic_list[k] = v + } + delete(tmp_topic_list, job_ctl.filter.JobId) + topic_list = tmp_topic_list + } + + case msg := <-data_in_ch: + if msg == nil { + log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") + + running = false + return + } + if fsbucket != "" && fvolume == "" { + if mc == nil { + var err *error + mc, err = create_minio_client(mc_id) + if err != nil { + log.Debug("Cannot create minio client for type job: ", type_id) + } + } + } + //TODO: Sort processed file conversions in order (FIFO) + jobLimiterChan <- struct{}{} + go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id) + + case <-time.After(1 * time.Second): + if !running { + return + } + } + } + //}() +} + +func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) { + defer func() { + <-jobLimiterChan + }() + PrintMemUsage() + + if fvolume == "" && fsbucket == "" { + log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message") + return + } else if (fvolume != "") && (fsbucket != "") { + log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message") + return + } + + start := time.Now() + var evt_data XmlFileEventHeader + + err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) + if err != nil { + log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err) + return + } + log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String()) + + var reader io.Reader + + //TODO -> config + INPUTBUCKET := "ropfiles" + + filename := "" + if fvolume != "" { + filename = fvolume + "/" + evt_data.Name + fi, err := os.Open(filename) + + if err != nil { + log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err) + return + } + defer fi.Close() + reader = fi + //} else if evt_data.ObjectStoreBucket != "" { + } else { + filename = evt_data.Name + if mc != nil { + tctx := context.Background() + mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) + if err != nil { + log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err) + return + } + if mr == nil { + log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err) + return + } + reader = mr + defer mr.Close() + } else { + log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message") + return + } + } + + if reader == nil { + log.Error("Cannot get: ", filename, " - null reader") + return + } + var file_bytes []byte + if strings.HasSuffix(filename, "gz") { + start := time.Now() + var buf3 bytes.Buffer + errb := gunzipReaderToWriter(&buf3, reader) + if errb != nil { + log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb) + return + } + file_bytes = buf3.Bytes() + log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes)) + + } else { + var buf3 bytes.Buffer + _, err2 := io.Copy(&buf3, reader) + if err2 != nil { + log.Error("File ", filename, " - cannot be read, discarding message, ", err) + return + } + file_bytes = buf3.Bytes() + } + start = time.Now() + b, err := xml_to_json_conv(&file_bytes, &evt_data) + if err != nil { + log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err) + return + } + log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b)) + + new_fn := evt_data.Name + os.Getenv("KP") + ".json" + if outputCompression == "gz" { + new_fn = new_fn + ".gz" + start = time.Now() + var buf bytes.Buffer + err = gzipWrite(&buf, &b) + if err != nil { + log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err) + return + } + b = buf.Bytes() + log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes)) + + } + start = time.Now() + + if fvolume != "" { + //Store on disk + err = os.WriteFile(fvolume+"/"+new_fn, b, 0644) + if err != nil { + log.Error("Cannot write file ", new_fn, " - discarding message,", err) + return + } + log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes)) + } else if fsbucket != "" { + // Store in minio + objectName := new_fn + if mc != nil { + + contentType := "application/json" + if strings.HasSuffix(objectName, ".gz") { + contentType = "application/gzip" + } + + // Upload the xml file with PutObject + r := bytes.NewReader(b) + tctx := context.Background() + if check_minio_bucket(mc, mc_id, fsbucket) == false { + err := create_minio_bucket(mc, mc_id, fsbucket) + if err != nil { + log.Error("Cannot create bucket: ", fsbucket, ", ", err) + return + } + } + ok := false + for i := 1; i < 64 && ok == false; i = i * 2 { + info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType}) + if err != nil { + + if i == 1 { + log.Warn("Cannot upload (first attempt): ", objectName, ", ", err) + } else { + log.Warn("Cannot upload (retry): ", objectName, ", ", err) + } + time.Sleep(time.Duration(i) * time.Second) + } else { + log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String()) + log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size) + ok = true + } + } + if !ok { + log.Error("Cannot upload : ", objectName, ", ", err) + } + } else { + log.Error("Cannot upload: ", objectName, ", no client") + } + } + + start = time.Now() + if fvolume == "" { + var fde FileDownloadedEvt + fde.Filename = new_fn + j, err := jsoniter.Marshal(fde) + + if err != nil { + log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err) + return + } + msg.msg.Value = j + } else { + var fde FileDownloadedEvt + fde.Filename = new_fn + j, err := jsoniter.Marshal(fde) + + if err != nil { + log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err) + return + } + msg.msg.Value = j + } + msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"") + log.Debug("Marshal file-collect event ", time.Since(start).String()) + + for k, v := range topic_list { + var kmsg *KafkaPayload = new(KafkaPayload) + kmsg.msg = msg.msg + kmsg.topic = v + kmsg.jobid = k + data_out_channel <- kmsg + } +} + +func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) { + var f MeasCollecFile + start := time.Now() + err := xml.Unmarshal(*f_byteValue, &f) + if err != nil { + return nil, errors.New("Cannot unmarshal xml-file") + } + log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String()) + + start = time.Now() + var pmfile PMJsonFile + + //TODO: Fill in more values + pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0" + pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 + pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = "" + pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn + pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion + + for _, it := range f.MeasData.MeasInfo { + var mili MeasInfoList + mili.MeasInfoID.SMeasInfoID = it.MeasInfoId + for _, jt := range it.MeasType { + mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text) + } + for _, jt := range it.MeasValue { + var mv MeasValues + mv.MeasObjInstID = jt.MeasObjLdn + mv.SuspectFlag = jt.Suspect + if jt.Suspect == "" { + mv.SuspectFlag = "false" + } + for _, kt := range jt.R { + ni, _ := strconv.Atoi(kt.P) + nv := kt.Text + mr := MeasResults{ni, nv} + mv.MeasResultsList = append(mv.MeasResultsList, mr) + } + mili.MeasValuesList = append(mili.MeasValuesList, mv) + } + + pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili) + } + + pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 + + //TODO: Fill more values + pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain + pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID + pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence + pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName + pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName + pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName + pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority + pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec + pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec + pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version + pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion + pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset + + log.Debug("Convert xml to json : ", time.Since(start).String()) + + start = time.Now() + json, err := jsoniter.Marshal(pmfile) + log.Debug("Marshal json : ", time.Since(start).String()) + + if err != nil { + return nil, errors.New("Cannot marshal converted json") + } + return json, nil +} + +func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) { + + log.Info("Type job", type_id, " started") + + filters := make(map[string]Filter) + filterParams_list := make(map[string]FilterMaps) + // ch_list := make(map[string]chan *KafkaPayload) + topic_list := make(map[string]string) + var mc *minio.Client + const mc_id = "mc_" + "start_job_json_file_data" + running := true + for { + select { + case job_ctl := <-control_ch: + log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) + switch job_ctl.command { + case "EXIT": + //ignore cmd - handled by channel signal + case "ADD-FILTER": + //TODO: Refactor... + filters[job_ctl.filter.JobId] = job_ctl.filter + log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) + + tmp_filterParams_list := make(map[string]FilterMaps) + for k, v := range filterParams_list { + tmp_filterParams_list[k] = v + } + tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter + filterParams_list = tmp_filterParams_list + + tmp_topic_list := make(map[string]string) + for k, v := range topic_list { + tmp_topic_list[k] = v + } + tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic + topic_list = tmp_topic_list + case "REMOVE-FILTER": + //TODO: Refactor... + log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) + + tmp_filterParams_list := make(map[string]FilterMaps) + for k, v := range filterParams_list { + tmp_filterParams_list[k] = v + } + delete(tmp_filterParams_list, job_ctl.filter.JobId) + filterParams_list = tmp_filterParams_list + + tmp_topic_list := make(map[string]string) + for k, v := range topic_list { + tmp_topic_list[k] = v + } + delete(tmp_topic_list, job_ctl.filter.JobId) + topic_list = tmp_topic_list + } + + case msg := <-data_in_ch: + if msg == nil { + log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") + + running = false + return + } + if objectstore { + if mc == nil { + var err *error + mc, err = create_minio_client(mc_id) + if err != nil { + log.Debug("Cannot create minio client for type job: ", type_id) + } + } + } + //TODO: Sort processed file conversions in order (FIFO) + jobLimiterChan <- struct{}{} + go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore) + + case <-time.After(1 * time.Second): + if !running { + return + } + } + } +} + +func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) { + + //Release job limit + defer func() { + <-jobLimiterChan + }() + + PrintMemUsage() + + var evt_data FileDownloadedEvt + err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) + if err != nil { + log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err) + return + } + log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename) + + var reader io.Reader + + //TODO -> config + //INPUTBUCKET := "json-file-ready" + INPUTBUCKET := "pm-files-json" + filename := "" + if objectstore == false { + filename = files_volume + "/" + evt_data.Filename + fi, err := os.Open(filename) + + if err != nil { + log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err) + return + } + defer fi.Close() + reader = fi + } else { + filename = "/" + evt_data.Filename + if mc != nil { + if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false { + log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET) + return + } + tctx := context.Background() + mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) + if err != nil { + log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err) + return + } + reader = mr + defer mr.Close() + } else { + log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client") + return + } + } + + var data *[]byte + if strings.HasSuffix(filename, "gz") { + start := time.Now() + var buf2 bytes.Buffer + errb := gunzipReaderToWriter(&buf2, reader) + if errb != nil { + log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb) + return + } + d := buf2.Bytes() + data = &d + log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String()) + } else { + + start := time.Now() + d, err := io.ReadAll(reader) + if err != nil { + log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err) + return + } + data = &d + + log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) + } + + // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside + // var pmfile PMJsonFile + // start := time.Now() + // err = jsoniter.Unmarshal(*data, &pmfile) + // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) + + // if err != nil { + // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) + // return + // } + for k, v := range filterList { + + var pmfile PMJsonFile + start := time.Now() + err = jsoniter.Unmarshal(*data, &pmfile) + log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) + + if err != nil { + log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) + return + } + + var kmsg *KafkaPayload = new(KafkaPayload) + kmsg.msg = new(kafka.Message) + kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"") + log.Debug("topic:", topic_list[k]) + log.Debug("sourceNameMap:", v.sourceNameMap) + log.Debug("measObjClassMap:", v.measObjClassMap) + log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap) + log.Debug("measTypesMap:", v.measTypesMap) + //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 { + b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) + if b == nil { + log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") + return + } + kmsg.msg.Value = *b + //BMX} + + // if outputCompression == "json.gz" { + // start := time.Now() + // var buf bytes.Buffer + // err := gzipWrite(&buf, &kmsg.msg.Value) + // if err != nil { + // log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err) + // return + + // } + // kmsg.msg.Value = buf.Bytes() + // log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String()) + // } + kmsg.topic = topic_list[k] + kmsg.jobid = k + + data_out_channel <- kmsg + } + +} + +func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte { + + if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil { + return nil + } + start := time.Now() + j, err := jsoniter.Marshal(&data) + + log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String()) + + if err != nil { + log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err) + return nil + } + + log.Debug("Filtered json obj: ", resource, " len: ", len(j)) + return &j +} + +func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile { + filter_req := true + start := time.Now() + if len(sourceNameMap) != 0 { + if !sourceNameMap[data.Event.CommonEventHeader.SourceName] { + filter_req = false + return nil + } + } + if filter_req { + modified := false + var temp_mil []MeasInfoList + for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { + + check_cntr := false + var cnt_flags []bool + if len(measTypesMap) > 0 { + c_cntr := 0 + var temp_mtl []string + for _, v := range zz.MeasTypes.SMeasTypesList { + if measTypesMap[v] { + cnt_flags = append(cnt_flags, true) + c_cntr++ + temp_mtl = append(temp_mtl, v) + } else { + cnt_flags = append(cnt_flags, false) + } + } + if c_cntr > 0 { + check_cntr = true + zz.MeasTypes.SMeasTypesList = temp_mtl + } else { + modified = true + continue + } + } + keep := false + var temp_mvl []MeasValues + for _, yy := range zz.MeasValuesList { + keep_class := false + keep_inst := false + keep_cntr := false + + dna := strings.Split(yy.MeasObjInstID, ",") + instName := dna[len(dna)-1] + cls := strings.Split(dna[len(dna)-1], "=")[0] + + if len(measObjClassMap) > 0 { + if measObjClassMap[cls] { + keep_class = true + } + } else { + keep_class = true + } + + if len(measObjInstIdsMap) > 0 { + if measObjInstIdsMap[instName] { + keep_inst = true + } + } else { + keep_inst = true + } + + if check_cntr { + var temp_mrl []MeasResults + cnt_p := 1 + for _, v := range yy.MeasResultsList { + if cnt_flags[v.P-1] { + v.P = cnt_p + cnt_p++ + temp_mrl = append(temp_mrl, v) + } + } + yy.MeasResultsList = temp_mrl + keep_cntr = true + } else { + keep_cntr = true + } + if keep_class && keep_cntr && keep_inst { + keep = true + temp_mvl = append(temp_mvl, yy) + } + } + if keep { + zz.MeasValuesList = temp_mvl + temp_mil = append(temp_mil, zz) + modified = true + } + + } + //Only if modified + if modified { + if len(temp_mil) == 0 { + log.Debug("Msg filtered, nothing found, discarding, obj: ", resource) + return nil + } + data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil + } + } + log.Debug("Filter: ", time.Since(start).String()) + return data +} + +// func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte { + +// filter_req := true +// start := time.Now() +// if len(sourceNameMap) != 0 { +// if !sourceNameMap[data.Event.CommonEventHeader.SourceName] { +// filter_req = false +// return nil +// } +// } +// if filter_req { +// modified := false +// var temp_mil []MeasInfoList +// for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { + +// check_cntr := false +// var cnt_flags []bool +// if len(measTypesMap) > 0 { +// c_cntr := 0 +// var temp_mtl []string +// for _, v := range zz.MeasTypes.SMeasTypesList { +// if measTypesMap[v] { +// cnt_flags = append(cnt_flags, true) +// c_cntr++ +// temp_mtl = append(temp_mtl, v) +// } else { +// cnt_flags = append(cnt_flags, false) +// } +// } +// if c_cntr > 0 { +// check_cntr = true +// zz.MeasTypes.SMeasTypesList = temp_mtl +// } else { +// modified = true +// continue +// } +// } +// keep := false +// var temp_mvl []MeasValues +// for _, yy := range zz.MeasValuesList { +// keep_class := false +// keep_inst := false +// keep_cntr := false + +// dna := strings.Split(yy.MeasObjInstID, ",") +// instName := dna[len(dna)-1] +// cls := strings.Split(dna[len(dna)-1], "=")[0] + +// if len(measObjClassMap) > 0 { +// if measObjClassMap[cls] { +// keep_class = true +// } +// } else { +// keep_class = true +// } + +// if len(measObjInstIdsMap) > 0 { +// if measObjInstIdsMap[instName] { +// keep_inst = true +// } +// } else { +// keep_inst = true +// } + +// if check_cntr { +// var temp_mrl []MeasResults +// cnt_p := 1 +// for _, v := range yy.MeasResultsList { +// if cnt_flags[v.P-1] { +// v.P = cnt_p +// cnt_p++ +// temp_mrl = append(temp_mrl, v) +// } +// } +// yy.MeasResultsList = temp_mrl +// keep_cntr = true +// } else { +// keep_cntr = true +// } +// if keep_class && keep_cntr && keep_inst { +// keep = true +// temp_mvl = append(temp_mvl, yy) +// } +// } +// if keep { +// zz.MeasValuesList = temp_mvl +// temp_mil = append(temp_mil, zz) +// modified = true +// } + +// } +// //Only if modified +// if modified { +// if len(temp_mil) == 0 { +// log.Debug("Msg filtered, nothing found, discarding, obj: ", resource) +// return nil +// } +// data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil +// } +// } +// log.Debug("Filter: ", time.Since(start).String()) + +// start = time.Now() +// j, err := jsoniter.Marshal(&data) + +// log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String()) + +// if err != nil { +// log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err) +// return nil +// } + +// log.Debug("Filtered json obj: ", resource, " len: ", len(j)) +// return &j +// } + +func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) { + + log.Info("Type job", type_id, " started") + log.Debug("influx job ch ", data_in_ch) + filters := make(map[string]Filter) + filterParams_list := make(map[string]FilterMaps) + influx_job_params := make(map[string]InfluxJobParameters) + var mc *minio.Client + const mc_id = "mc_" + "start_job_json_file_data_influx" + running := true + for { + select { + case job_ctl := <-control_ch: + log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) + switch job_ctl.command { + case "EXIT": + //ignore cmd - handled by channel signal + case "ADD-FILTER": + //TODO: Refactor... + filters[job_ctl.filter.JobId] = job_ctl.filter + log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) + log.Debug(job_ctl.filter) + tmp_filterParams_list := make(map[string]FilterMaps) + for k, v := range filterParams_list { + tmp_filterParams_list[k] = v + } + tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter + filterParams_list = tmp_filterParams_list + + tmp_influx_job_params := make(map[string]InfluxJobParameters) + for k, v := range influx_job_params { + tmp_influx_job_params[k] = v + } + tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters + influx_job_params = tmp_influx_job_params + + case "REMOVE-FILTER": + //TODO: Refactor... + log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) + + tmp_filterParams_list := make(map[string]FilterMaps) + for k, v := range filterParams_list { + tmp_filterParams_list[k] = v + } + delete(tmp_filterParams_list, job_ctl.filter.JobId) + filterParams_list = tmp_filterParams_list + + tmp_influx_job_params := make(map[string]InfluxJobParameters) + for k, v := range influx_job_params { + tmp_influx_job_params[k] = v + } + delete(tmp_influx_job_params, job_ctl.filter.JobId) + influx_job_params = tmp_influx_job_params + } + + case msg := <-data_in_ch: + log.Debug("Data reveived - influx") + if msg == nil { + log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") + + running = false + return + } + if objectstore { + if mc == nil { + var err *error + mc, err = create_minio_client(mc_id) + if err != nil { + log.Debug("Cannot create minio client for type job: ", type_id) + } + } + } + //TODO: Sort processed file conversions in order (FIFO) + jobLimiterChan <- struct{}{} + go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore) + + case <-time.After(1 * time.Second): + if !running { + return + } + } + } +} + +func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) { + + log.Debug("run_json_file_data_job_influx") + //Release job limit + defer func() { + <-jobLimiterChan + }() + + PrintMemUsage() + + var evt_data FileDownloadedEvt + err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) + if err != nil { + log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err) + return + } + log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename) + + var reader io.Reader + + //TODO -> config + //INPUTBUCKET := "json-file-ready" + INPUTBUCKET := "pm-files-json" + filename := "" + if objectstore == false { + filename = files_volume + "/" + evt_data.Filename + fi, err := os.Open(filename) + + if err != nil { + log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err) + return + } + defer fi.Close() + reader = fi + } else { + filename = "/" + evt_data.Filename + if mc != nil { + if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false { + log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET) + return + } + tctx := context.Background() + mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) + if err != nil { + log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err) + return + } + reader = mr + defer mr.Close() + } else { + log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client") + return + } + } + + var data *[]byte + if strings.HasSuffix(filename, "gz") { + start := time.Now() + var buf2 bytes.Buffer + errb := gunzipReaderToWriter(&buf2, reader) + if errb != nil { + log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb) + return + } + d := buf2.Bytes() + data = &d + log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String()) + } else { + + start := time.Now() + d, err := io.ReadAll(reader) + if err != nil { + log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err) + return + } + data = &d + + log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) + } + // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside + // var pmfile PMJsonFile + // start := time.Now() + // err = jsoniter.Unmarshal(*data, &pmfile) + // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) + + // if err != nil { + // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) + // return + // } + for k, v := range filterList { + + var pmfile PMJsonFile + start := time.Now() + err = jsoniter.Unmarshal(*data, &pmfile) + log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) + + if err != nil { + log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) + return + } + + if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 { + b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) + if b == nil { + log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") + return + } + + } + fluxParms := influxList[k] + log.Debug("Influxdb params: ", fluxParms) + client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken) + writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket) + + // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec) + // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond) + // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond) + // timeT := time.Unix(tUnix, tUnixNanoRemainder) + // fmt.Println(timeT) + // fmt.Println("======================") + for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { + ctr_names := make(map[string]string) + for cni, cn := range zz.MeasTypes.SMeasTypesList { + ctr_names[string(cni+1)] = cn + } + for _, xx := range zz.MeasValuesList { + log.Debug("Measurement: ", xx.MeasObjInstID) + log.Debug("Suspect flag: ", xx.SuspectFlag) + p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID) + p.AddField("suspectflag", xx.SuspectFlag) + p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod) + p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset) + for _, yy := range xx.MeasResultsList { + pi := string(yy.P) + pv := yy.SValue + pn := ctr_names[pi] + log.Debug("Counter: ", pn, " Value: ", pv) + pv_i, err := strconv.Atoi(pv) + if err == nil { + p.AddField(pn, pv_i) + } else { + p.AddField(pn, pv) + } + } + //p.SetTime(timeT) + log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec) + log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0)) + p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0)) + err := writeAPI.WritePoint(context.Background(), p) + if err != nil { + log.Error("Db write error: ", err) + } + } + + } + client.Close() + } + +}