Remove unused code 46/11546/2
authorktimoney <kevin.timoney@est.tech>
Thu, 8 Jun 2023 14:20:53 +0000 (15:20 +0100)
committerktimoney <kevin.timoney@est.tech>
Tue, 8 Aug 2023 11:38:23 +0000 (12:38 +0100)
Issue-ID: NONRTRIC-880
Change-Id: I5e9864f7ff89e7cca7b013632a554c9437377e30
Signed-off-by: ktimoney <kevin.timoney@est.tech>
install/helm/nrt-base-1/charts/strimzi-kafka/templates/app-kafka.yaml
install/scripts/update_ics_job.sh [new file with mode: 0755]
install/update-pm-log.sh [new file with mode: 0755]
pm-file-converter/Dockerfile
pm-file-converter/common/dataTypes/dataTypes.go [new file with mode: 0644]
pm-file-converter/common/utils/utils.go [new file with mode: 0644]
pm-file-converter/components/kafkacollector/kafkacollector.go [new file with mode: 0644]
pm-file-converter/components/miniocollector/miniocollector.go [new file with mode: 0644]
pm-file-converter/components/xmltransform/xmltransform.go [new file with mode: 0644]
pm-file-converter/main.go

index dcc0bb8..de55ff6 100644 (file)
@@ -22,7 +22,7 @@ metadata:
   namespace: nonrtric
 spec:
   kafka:
-    version: 3.3.1
+    version: 3.5.0
     replicas: 1
     listeners:
       - name: plain
diff --git a/install/scripts/update_ics_job.sh b/install/scripts/update_ics_job.sh
new file mode 100755 (executable)
index 0000000..b8788a6
--- /dev/null
@@ -0,0 +1,52 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# args: <job-id> <job-index-suffix> [<access-token>]
+# job file shall exist in file "".job.json"
+update_ics_job() {
+
+    ICS_PROXY_PORT=$(kubectl get svc -n nonrtric informationservice --output jsonpath='{.spec.ports[?(@.name=="http")].nodePort}')
+    echo "NodePort to ics: "$ICS_PROXY_PORT
+    JOB=$(<.job.json)
+    echo $JOB
+    retcode=1
+    echo "Updating job $1"
+    while [ $retcode -ne 0 ]; do
+        if [ -z "$2" ]; then
+            __bearer=""
+        else
+            __bearer="Authorization: Bearer $TOKEN"
+        fi
+        STAT=$(curl -s -X PUT -w '%{http_code}' -H accept:application/json -H Content-Type:application/json http://$KUBERNETESHOST:$ICS_PROXY_PORT/data-consumer/v1/info-jobs/$1 --data-binary @.job.json -H "$__bearer" )
+        retcode=$?
+        echo "curl return code: $retcode"
+        if [ $retcode -eq 0 ]; then
+            status=${STAT:${#STAT}-3}
+            echo "http status code: "$status
+            if [ "$status" == "200" ]; then
+                echo "Job created ok"
+            elif [ "$status" == "201" ]; then
+                echo "Job created ok"
+            else
+                retcode=1
+            fi
+        fi
+        sleep 1
+    done
+}
diff --git a/install/update-pm-log.sh b/install/update-pm-log.sh
new file mode 100755 (executable)
index 0000000..c523fce
--- /dev/null
@@ -0,0 +1,88 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+. scripts/kube_get_controlplane_host.sh
+
+# Generic error printout function
+# args: <numeric-response-code> <descriptive-string>
+check_error() {
+    if [ $1 -ne 0 ]; then
+        echo "Failed: $2"
+        echo "Exiting..."
+        exit 1
+    fi
+}
+
+export KUBERNETESHOST=$(kube_get_controlplane_host)
+if [ $? -ne 0 ]; then
+    echo $KUBERNETESHOST
+    echo "Exiting"
+    exit 1
+fi
+
+echo "Kubernetes control plane host: $KUBERNETESHOST"
+
+. scripts/kube_get_nodeport.sh
+. scripts/get_influxdb2_token.sh
+. scripts/create_influxdb2_bucket.sh
+. scripts/update_ics_job.sh
+
+echo "Installation of pm to influx job"
+
+echo " Retriving influxdb2 access token..."
+INFLUXDB2_TOKEN=$(get_influxdb2_token influxdb2-0 nonrtric)
+
+
+bucket=pm-logg-bucket
+echo "Creating bucket $bucket in influxdb2"
+create_influxdb2_bucket influxdb2-0 nonrtric $bucket
+
+. scripts/populate_keycloak.sh
+
+cid="console-setup"
+TOKEN=$(get_client_token nonrtric-realm $cid)
+
+JOB='{
+       "info_type_id": "PmData",
+       "job_owner": "console",
+       "job_definition": {
+          "filter": {
+             "sourceNames": ["node2-1"],
+             "measObjInstIds": [],
+             "measTypeSpecs": [
+                {
+                   "measuredObjClass": "NRCellDU",
+                   "measTypes": [
+                      "pmCounterNumber102"
+                   ]
+                }
+             ],
+             "measuredEntityDns": []
+          },
+          "deliveryInfo": {
+             "topic": "pmreports",
+             "bootStrapServers": "kafka-1-kafka-bootstrap.nonrtric:9097"
+          }
+       }
+    }'
+echo $JOB > .job.json
+update_ics_job pmlog $TOKEN
+
+echo "done"
+
index 80867f2..6248698 100644 (file)
@@ -20,6 +20,8 @@ FROM golang:1.20.3-buster AS build
 WORKDIR /app
 
 COPY main.go .
+ADD common common
+ADD components components
 RUN go mod init main
 RUN go mod tidy
 
diff --git a/pm-file-converter/common/dataTypes/dataTypes.go b/pm-file-converter/common/dataTypes/dataTypes.go
new file mode 100644 (file)
index 0000000..66b462c
--- /dev/null
@@ -0,0 +1,230 @@
+// -
+//
+//      ========================LICENSE_START=================================
+//      O-RAN-SC
+//      %%
+//      Copyright (C) 2023: Nordix Foundation
+//      %%
+//      Licensed under the Apache License, Version 2.0 (the "License");
+//      you may not use this file except in compliance with the License.
+//      You may obtain a copy of the License at
+//
+//           http://www.apache.org/licenses/LICENSE-2.0
+//
+//      Unless required by applicable law or agreed to in writing, software
+//      distributed under the License is distributed on an "AS IS" BASIS,
+//      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//      See the License for the specific language governing permissions and
+//      limitations under the License.
+//      ========================LICENSE_END===================================
+
+package dataTypes
+
+import (
+        "encoding/xml"
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+// // Data type for event xml file download
+type XmlFileEventHeader struct {
+       ProductName        string `json:"productName"`
+       VendorName         string `json:"vendorName"`
+       Location           string `json:"location"`
+       Compression        string `json:"compression"`
+       SourceName         string `json:"sourceName"`
+       FileFormatType     string `json:"fileFormatType"`
+       FileFormatVersion  string `json:"fileFormatVersion"`
+       StartEpochMicrosec int64  `json:"startEpochMicrosec"`
+       LastEpochMicrosec  int64  `json:"lastEpochMicrosec"`
+       Name               string `json:"name"`
+       ChangeIdentifier   string `json:"changeIdentifier"`
+       InternalLocation   string `json:"internalLocation"`
+       TimeZoneOffset     string `json:"timeZoneOffset"`
+       ObjectStoreBucket  string `json:"objectStoreBucket"`
+}
+
+// Data types for input xml file
+type MeasCollecFile struct {
+       XMLName        xml.Name `xml:"measCollecFile"`
+       Text           string   `xml:",chardata"`
+       Xmlns          string   `xml:"xmlns,attr"`
+       Xsi            string   `xml:"xsi,attr"`
+       SchemaLocation string   `xml:"schemaLocation,attr"`
+       FileHeader     struct {
+               Text              string `xml:",chardata"`
+               FileFormatVersion string `xml:"fileFormatVersion,attr"`
+               VendorName        string `xml:"vendorName,attr"`
+               DnPrefix          string `xml:"dnPrefix,attr"`
+               FileSender        struct {
+                       Text        string `xml:",chardata"`
+                       LocalDn     string `xml:"localDn,attr"`
+                       ElementType string `xml:"elementType,attr"`
+               } `xml:"fileSender"`
+               MeasCollec struct {
+                       Text      string `xml:",chardata"`
+                       BeginTime string `xml:"beginTime,attr"`
+               } `xml:"measCollec"`
+       } `xml:"fileHeader"`
+       MeasData struct {
+               Text           string `xml:",chardata"`
+               ManagedElement struct {
+                       Text      string `xml:",chardata"`
+                       LocalDn   string `xml:"localDn,attr"`
+                       SwVersion string `xml:"swVersion,attr"`
+               } `xml:"managedElement"`
+               MeasInfo []struct {
+                       Text       string `xml:",chardata"`
+                       MeasInfoId string `xml:"measInfoId,attr"`
+                       Job        struct {
+                               Text  string `xml:",chardata"`
+                               JobId string `xml:"jobId,attr"`
+                       } `xml:"job"`
+                       GranPeriod struct {
+                               Text     string `xml:",chardata"`
+                               Duration string `xml:"duration,attr"`
+                               EndTime  string `xml:"endTime,attr"`
+                       } `xml:"granPeriod"`
+                       RepPeriod struct {
+                               Text     string `xml:",chardata"`
+                               Duration string `xml:"duration,attr"`
+                       } `xml:"repPeriod"`
+                       MeasType []struct {
+                               Text string `xml:",chardata"`
+                               P    string `xml:"p,attr"`
+                       } `xml:"measType"`
+                       MeasValue []struct {
+                               Text       string `xml:",chardata"`
+                               MeasObjLdn string `xml:"measObjLdn,attr"`
+                               R          []struct {
+                                       Text string `xml:",chardata"`
+                                       P    string `xml:"p,attr"`
+                               } `xml:"r"`
+                               Suspect string `xml:"suspect"`
+                       } `xml:"measValue"`
+               } `xml:"measInfo"`
+       } `xml:"measData"`
+       FileFooter struct {
+               Text       string `xml:",chardata"`
+               MeasCollec struct {
+                       Text    string `xml:",chardata"`
+                       EndTime string `xml:"endTime,attr"`
+               } `xml:"measCollec"`
+       } `xml:"fileFooter"`
+}
+
+// Splitted in sevreal part to allow add/remove in lists
+type MeasResults struct {
+       P      int    `json:"p"`
+       SValue string `json:"sValue"`
+}
+
+type MeasValues struct {
+       MeasObjInstID   string        `json:"measObjInstId"`
+       SuspectFlag     string        `json:"suspectFlag"`
+       MeasResultsList []MeasResults `json:"measResults"`
+}
+
+type SMeasTypes struct {
+       SMeasType string `json:"sMeasTypesList"`
+}
+
+type MeasInfoList struct {
+       MeasInfoID struct {
+               SMeasInfoID string `json:"sMeasInfoId"`
+       } `json:"measInfoId"`
+       MeasTypes struct {
+               SMeasTypesList []string `json:"sMeasTypesList"`
+       } `json:"measTypes"`
+       MeasValuesList []MeasValues `json:"measValuesList"`
+}
+
+type PMJsonFile struct {
+       Event struct {
+               CommonEventHeader struct {
+                       Domain                  string `json:"domain"`
+                       EventID                 string `json:"eventId"`
+                       Sequence                int    `json:"sequence"`
+                       EventName               string `json:"eventName"`
+                       SourceName              string `json:"sourceName"`
+                       ReportingEntityName     string `json:"reportingEntityName"`
+                       Priority                string `json:"priority"`
+                       StartEpochMicrosec      int64  `json:"startEpochMicrosec"`
+                       LastEpochMicrosec       int64  `json:"lastEpochMicrosec"`
+                       Version                 string `json:"version"`
+                       VesEventListenerVersion string `json:"vesEventListenerVersion"`
+                       TimeZoneOffset          string `json:"timeZoneOffset"`
+               } `json:"commonEventHeader"`
+               Perf3GppFields struct {
+                       Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
+                       MeasDataCollection    struct {
+                               GranularityPeriod             int            `json:"granularityPeriod"`
+                               MeasuredEntityUserName        string         `json:"measuredEntityUserName"`
+                               MeasuredEntityDn              string         `json:"measuredEntityDn"`
+                               MeasuredEntitySoftwareVersion string         `json:"measuredEntitySoftwareVersion"`
+                               SMeasInfoList                 []MeasInfoList `json:"measInfoList"`
+                       } `json:"measDataCollection"`
+               } `json:"perf3gppFields"`
+       } `json:"event"`
+}
+
+type FileDownloadedEvt struct {
+       Filename string `json:"filename"`
+}
+
+type KafkaPayload struct {
+       Msg   *kafka.Message
+       Topic string
+}
+
+// Type for controlling the topic reader
+type ReaderControl struct {
+       Command string
+}
+
+// Type for controlling the topic writer
+type WriterControl struct {
+       Command string
+}
+
+// == API Datatypes ==//
+// Type for supported data types
+type DataType struct {
+       ID                 string `json:"id"`
+       KafkaInputTopic    string `json:"kafkaInputTopic"`
+       InputJobType       string `json:inputJobType`
+       InputJobDefinition struct {
+               KafkaOutputTopic string `json:kafkaOutputTopic`
+       } `json:inputJobDefinition`
+
+       Ext_job         *[]byte
+       Ext_job_created bool
+       Ext_job_id      string
+}
+
+// Type for controlling the job
+type JobControl struct {
+       Command string
+       //Filter  Filter
+}
+
+type AppStates int64
+
+var InfoTypes DataTypes
+
+// Keep all info type jobs, key == type id
+var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
+
+// Type for an infojob
+type TypeJobRecord struct {
+       InfoType        string
+       InputTopic      string
+       Data_in_channel chan *KafkaPayload
+       Reader_control  chan ReaderControl
+       Job_control     chan JobControl
+       GroupId         string
+       ClientId        string
+}
+
+type DataTypes struct {
+       ProdDataTypes []DataType `json:"types"`
+}
diff --git a/pm-file-converter/common/utils/utils.go b/pm-file-converter/common/utils/utils.go
new file mode 100644 (file)
index 0000000..5466d2c
--- /dev/null
@@ -0,0 +1,74 @@
+// -
+//
+//     ========================LICENSE_START=================================
+//     O-RAN-SC
+//     %%
+//     Copyright (C) 2023: Nordix Foundation
+//     %%
+//     Licensed under the Apache License, Version 2.0 (the "License");
+//     you may not use this file except in compliance with the License.
+//     You may obtain a copy of the License at
+//
+//          http://www.apache.org/licenses/LICENSE-2.0
+//
+//     Unless required by applicable law or agreed to in writing, software
+//     distributed under the License is distributed on an "AS IS" BASIS,
+//     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//     See the License for the specific language governing permissions and
+//     limitations under the License.
+//     ========================LICENSE_END===================================
+package utils
+
+import (
+       "bytes"
+       log "github.com/sirupsen/logrus"
+       "main/components/kafkacollector"
+       "net/http"
+)
+
+var httpclient = &http.Client{}
+
+// Send a http request with json (json may be nil)
+func Send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
+
+       // set the HTTP method, url, and request body
+       var req *http.Request
+       var err error
+       if json == nil {
+               req, err = http.NewRequest(method, url, http.NoBody)
+       } else {
+               req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
+               req.Header.Set("Content-Type", "application/json; charset=utf-8")
+       }
+       if err != nil {
+               log.Error("Cannot create http request, method: ", method, " url: ", url)
+               return false
+       }
+
+       if useAuth {
+               token, err := kafkacollector.Fetch_token()
+               if err != nil {
+                       log.Error("Cannot fetch token for http request: ", err)
+                       return false
+               }
+               req.Header.Set("Authorization", "Bearer "+token.TokenValue)
+       }
+
+       log.Debug("HTTP request: ", req)
+
+       log.Debug("Sending http request")
+       resp, err2 := httpclient.Do(req)
+       if err2 != nil {
+               log.Error("Http request error: ", err2)
+               log.Error("Cannot send http request method: ", method, " url: ", url)
+       } else {
+               if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
+                       log.Debug("Accepted http status: ", resp.StatusCode)
+                       resp.Body.Close()
+                       return true
+               }
+               log.Debug("HTTP resp: ", resp)
+               resp.Body.Close()
+       }
+       return false
+}
diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go
new file mode 100644 (file)
index 0000000..d70114c
--- /dev/null
@@ -0,0 +1,456 @@
+// -
+//
+//     ========================LICENSE_START=================================
+//     O-RAN-SC
+//     %%
+//     Copyright (C) 2023: Nordix Foundation
+//     %%
+//     Licensed under the Apache License, Version 2.0 (the "License");
+//     you may not use this file except in compliance with the License.
+//     You may obtain a copy of the License at
+//
+//          http://www.apache.org/licenses/LICENSE-2.0
+//
+//     Unless required by applicable law or agreed to in writing, software
+//     distributed under the License is distributed on an "AS IS" BASIS,
+//     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//     See the License for the specific language governing permissions and
+//     limitations under the License.
+//     ========================LICENSE_END===================================
+package kafkacollector
+
+import (
+       "context"
+       "fmt"
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+       jsoniter "github.com/json-iterator/go"
+       log "github.com/sirupsen/logrus"
+       "golang.org/x/oauth2/clientcredentials"
+       "main/common/dataTypes"
+        "main/components/miniocollector"
+       "os"
+       "time"
+)
+
+var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
+var bootstrapserver = os.Getenv("KAFKA_SERVER")
+var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
+var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
+var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
+
+// Limiter - valid for all jobs
+const parallelism_limiter = 100 //For all jobs
+var jobLimiterChan = make(chan struct{}, parallelism_limiter)
+
+func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
+
+       log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
+
+       topic_ok := false
+       var c *kafka.Consumer = nil
+       running := true
+
+       for topic_ok == false {
+
+               select {
+               case reader_ctrl := <-control_ch:
+                       if reader_ctrl.Command == "EXIT" {
+                               log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
+                               data_ch <- nil //Signal to job handler
+                               running = false
+                               return
+                       }
+               case <-time.After(1 * time.Second):
+                       if !running {
+                               return
+                       }
+                       if c == nil {
+                               c = create_kafka_consumer(type_id, gid, cid)
+                               if c == nil {
+                                       log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
+                               } else {
+                                       log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
+                               }
+                       }
+                       if c != nil && topic_ok == false {
+                               err := c.SubscribeTopics([]string{topic}, nil)
+                               if err != nil {
+                                       log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
+                               } else {
+                                       log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
+                                       topic_ok = true
+                               }
+                       }
+               }
+       }
+       log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
+
+       var event_chan = make(chan int)
+       go func() {
+               for {
+                       select {
+                       case evt := <-c.Events():
+                               switch evt.(type) {
+                               case kafka.OAuthBearerTokenRefresh:
+                                       log.Debug("New consumer token needed: ", evt)
+                                       token, err := Fetch_token()
+                                       if err != nil {
+                                               log.Warning("Cannot cannot fetch token: ", err)
+                                               c.SetOAuthBearerTokenFailure(err.Error())
+                                       } else {
+                                               setTokenError := c.SetOAuthBearerToken(*token)
+                                               if setTokenError != nil {
+                                                       log.Warning("Cannot cannot set token: ", setTokenError)
+                                                       c.SetOAuthBearerTokenFailure(setTokenError.Error())
+                                               }
+                                       }
+                               default:
+                                       log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
+                               }
+
+                       case msg := <-event_chan:
+                               if msg == 0 {
+                                       return
+                               }
+                       case <-time.After(1 * time.Second):
+                               if !running {
+                                       return
+                               }
+                       }
+               }
+       }()
+
+       go func() {
+               for {
+                       for {
+                               select {
+                               case reader_ctrl := <-control_ch:
+                                       if reader_ctrl.Command == "EXIT" {
+                                               event_chan <- 0
+                                               log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
+                                               data_ch <- nil //Signal to job handler
+                                               defer c.Close()
+                                               return
+                                       }
+                               default:
+
+                                       ev := c.Poll(1000)
+                                       if ev == nil {
+                                               log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
+                                               continue
+                                       }
+                                       switch e := ev.(type) {
+                                       case *kafka.Message:
+                                               var kmsg dataTypes.KafkaPayload
+                                               kmsg.Msg = e
+
+                                               c.Commit()
+
+                                               data_ch <- &kmsg
+                                               log.Debug("Reader msg: ", &kmsg)
+                                               log.Debug("Reader - data_ch ", data_ch)
+                                       case kafka.Error:
+                                               fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+
+                                       case kafka.OAuthBearerTokenRefresh:
+                                               log.Debug("New consumer token needed: ", ev)
+                                               token, err := Fetch_token()
+                                               if err != nil {
+                                                       log.Warning("Cannot cannot fetch token: ", err)
+                                                       c.SetOAuthBearerTokenFailure(err.Error())
+                                               } else {
+                                                       setTokenError := c.SetOAuthBearerToken(*token)
+                                                       if setTokenError != nil {
+                                                               log.Warning("Cannot cannot set token: ", setTokenError)
+                                                               c.SetOAuthBearerTokenFailure(setTokenError.Error())
+                                                       }
+                                               }
+                                       default:
+                                               fmt.Printf("Ignored %v\n", e)
+                                       }
+                               }
+                       }
+               }
+       }()
+}
+
+func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
+
+       var kafka_producer *kafka.Producer
+
+       running := true
+       log.Info("Topic writer starting")
+
+       // Wait for kafka producer to become available - and be prepared to exit the writer
+       for kafka_producer == nil {
+               select {
+               case writer_ctl := <-control_ch:
+                       if writer_ctl.Command == "EXIT" {
+                               //ignore cmd
+                       }
+               default:
+                       kafka_producer = start_producer()
+                       if kafka_producer == nil {
+                               log.Debug("Could not start kafka producer - retrying")
+                               time.Sleep(1 * time.Second)
+                       } else {
+                               log.Debug("Kafka producer started")
+                       }
+               }
+       }
+
+       var event_chan = make(chan int)
+       go func() {
+               for {
+                       select {
+                       case evt := <-kafka_producer.Events():
+                               switch evt.(type) {
+                               case *kafka.Message:
+                                       m := evt.(*kafka.Message)
+
+                                       if m.TopicPartition.Error != nil {
+                                               log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
+                                       } else {
+                                               log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
+                                       }
+                               case kafka.Error:
+                                       log.Debug("Dumping topic writer event, error: ", evt)
+                               case kafka.OAuthBearerTokenRefresh:
+                                       log.Debug("New producer token needed: ", evt)
+                                       token, err := Fetch_token()
+                                       if err != nil {
+                                               log.Warning("Cannot cannot fetch token: ", err)
+                                               kafka_producer.SetOAuthBearerTokenFailure(err.Error())
+                                       } else {
+                                               setTokenError := kafka_producer.SetOAuthBearerToken(*token)
+                                               if setTokenError != nil {
+                                                       log.Warning("Cannot cannot set token: ", setTokenError)
+                                                       kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
+                                               }
+                                       }
+                               default:
+                                       log.Debug("Dumping topic writer event, unknown: ", evt)
+                               }
+
+                       case msg := <-event_chan:
+                               if msg == 0 {
+                                       return
+                               }
+                       case <-time.After(1 * time.Second):
+                               if !running {
+                                       return
+                               }
+                       }
+               }
+       }()
+       go func() {
+               for {
+                       select {
+                       case writer_ctl := <-control_ch:
+                               if writer_ctl.Command == "EXIT" {
+                                       // ignore - wait for channel signal
+                               }
+
+                       case kmsg := <-data_ch:
+                               if kmsg == nil {
+                                       event_chan <- 0
+                                       log.Info("Topic writer stopped by channel signal - start_topic_writer")
+                                       defer kafka_producer.Close()
+                                       return
+                               }
+
+                               retries := 10
+                               msg_ok := false
+                               var err error
+                               for retry := 1; retry <= retries && msg_ok == false; retry++ {
+                                       err = kafka_producer.Produce(&kafka.Message{
+                                               TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
+                                               Value:          kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
+
+                                       if err == nil {
+                                               msg_ok = true
+                                               log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
+                                       } else {
+                                               log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
+                                               time.Sleep(time.Duration(retry) * time.Second)
+                                       }
+                               }
+                               if !msg_ok {
+                                       log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
+                               }
+                       case <-time.After(1000 * time.Millisecond):
+                               if !running {
+                                       return
+                               }
+                       }
+               }
+       }()
+}
+
+func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
+       var cm kafka.ConfigMap
+       if creds_grant_type == "" {
+               log.Info("Creating kafka plain text consumer for type: ", type_id)
+               cm = kafka.ConfigMap{
+                       "bootstrap.servers":  bootstrapserver,
+                       "group.id":           gid,
+                       "client.id":          cid,
+                       "auto.offset.reset":  "latest",
+                       "enable.auto.commit": false,
+               }
+       } else {
+               log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
+               cm = kafka.ConfigMap{
+                       "bootstrap.servers":  bootstrapserver,
+                       "group.id":           gid,
+                       "client.id":          cid,
+                       "auto.offset.reset":  "latest",
+                       "enable.auto.commit": false,
+                       "sasl.mechanism":     "OAUTHBEARER",
+                       "security.protocol":  "SASL_PLAINTEXT",
+               }
+       }
+       c, err := kafka.NewConsumer(&cm)
+
+       if err != nil {
+               log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
+               return nil
+       }
+
+       log.Info("Created kafka consumer for type: ", type_id, " OK")
+       return c
+}
+
+// Start kafka producer
+func start_producer() *kafka.Producer {
+       log.Info("Creating kafka producer")
+
+       var cm kafka.ConfigMap
+       if creds_grant_type == "" {
+               log.Info("Creating kafka SASL plain text producer")
+               cm = kafka.ConfigMap{
+                       "bootstrap.servers": bootstrapserver,
+               }
+       } else {
+               log.Info("Creating kafka SASL plain text producer")
+               cm = kafka.ConfigMap{
+                       "bootstrap.servers": bootstrapserver,
+                       "sasl.mechanism":    "OAUTHBEARER",
+                       "security.protocol": "SASL_PLAINTEXT",
+               }
+       }
+
+       p, err := kafka.NewProducer(&cm)
+       if err != nil {
+               log.Error("Cannot create kafka producer,", err)
+               return nil
+       }
+       return p
+}
+
+func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+
+       log.Info("Type job", type_id, " started")
+       topic_list := make(map[string]string)
+       topic_list[type_id] = "json-file-ready-kp"
+       topic_list["PmData"] = "json-file-ready-kpadp"
+       running := true
+       for {
+               select {
+               case job_ctl := <-control_ch:
+                       log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
+                       switch job_ctl.Command {
+                       case "EXIT":
+                               //ignore cmd - handled by channel signal
+                       }
+
+               case msg := <-data_in_ch:
+                       if msg == nil {
+                               log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
+
+                               running = false
+                               return
+                       }
+                       jobLimiterChan <- struct{}{}
+                       go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
+
+               case <-time.After(1 * time.Second):
+                       if !running {
+                               return
+                       }
+               }
+       }
+}
+
+func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
+       defer func() {
+               <-jobLimiterChan
+       }()
+       start := time.Now()
+       var evt_data dataTypes.XmlFileEventHeader
+
+       err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
+       if err != nil {
+               log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
+               return
+       }
+       log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
+
+       start = time.Now()
+       new_fn := miniocollector.Xml_to_json_conv(&evt_data)
+       if err != nil {
+               log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
+               return
+       }
+       log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
+
+       var fde dataTypes.FileDownloadedEvt
+       fde.Filename = new_fn
+       j, err := jsoniter.Marshal(fde)
+
+       if err != nil {
+               log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
+               return
+       }
+       msg.Msg.Value = j
+
+       msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
+       log.Debug("Marshal file-collect event ", time.Since(start).String())
+       log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
+       for _, v := range topic_list {
+               fmt.Println("Output Topic: " + v)
+               var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
+               kmsg.Msg = msg.Msg
+               kmsg.Topic = v
+               data_out_channel <- kmsg
+       }
+}
+
+func Fetch_token() (*kafka.OAuthBearerToken, error) {
+       log.Debug("Get token inline")
+       conf := &clientcredentials.Config{
+               ClientID:     creds_client_id,
+               ClientSecret: creds_client_secret,
+               TokenURL:     creds_service_url,
+       }
+       token, err := conf.Token(context.Background())
+       if err != nil {
+               log.Warning("Cannot fetch access token: ", err)
+               return nil, err
+       }
+       extensions := map[string]string{}
+       log.Debug("=====================================================")
+       log.Debug("token: ", token)
+       log.Debug("=====================================================")
+       log.Debug("TokenValue: ", token.AccessToken)
+       log.Debug("=====================================================")
+       log.Debug("Expiration: ", token.Expiry)
+       t := token.Expiry
+       oauthBearerToken := kafka.OAuthBearerToken{
+               TokenValue: token.AccessToken,
+               Expiration: t,
+               Extensions: extensions,
+       }
+
+       return &oauthBearerToken, nil
+}
diff --git a/pm-file-converter/components/miniocollector/miniocollector.go b/pm-file-converter/components/miniocollector/miniocollector.go
new file mode 100644 (file)
index 0000000..1663194
--- /dev/null
@@ -0,0 +1,151 @@
+// -
+//
+//     ========================LICENSE_START=================================
+//     O-RAN-SC
+//     %%
+//     Copyright (C) 2023: Nordix Foundation
+//     %%
+//     Licensed under the Apache License, Version 2.0 (the "License");
+//     you may not use this file except in compliance with the License.
+//     You may obtain a copy of the License at
+//
+//          http://www.apache.org/licenses/LICENSE-2.0
+//
+//     Unless required by applicable law or agreed to in writing, software
+//     distributed under the License is distributed on an "AS IS" BASIS,
+//     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//     See the License for the specific language governing permissions and
+//     limitations under the License.
+//     ========================LICENSE_END===================================
+package miniocollector
+
+import (
+       "bytes"
+       "compress/gzip"
+       "context"
+       "fmt"
+       jsoniter "github.com/json-iterator/go"
+       "github.com/minio/minio-go/v7"
+       "github.com/minio/minio-go/v7/pkg/credentials"
+       log "github.com/sirupsen/logrus"
+       "io"
+       "main/common/dataTypes"
+       "main/components/xmltransform"
+       "net/url"
+       "os"
+       "strings"
+       "time"
+)
+
+func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
+       filestoreUser := os.Getenv("FILESTORE_USER")
+       filestorePwd := os.Getenv("FILESTORE_PWD")
+       filestoreServer := os.Getenv("FILESTORE_SERVER")
+
+       s3Client, err := minio.New(filestoreServer, &minio.Options{
+               Creds:  credentials.NewStaticV4(filestoreUser, filestorePwd, ""),
+               Secure: false,
+       })
+       if err != nil {
+               log.Fatalln(err)
+       }
+       expiry := time.Second * 24 * 60 * 60 // 1 day.
+       objectName := evt_data.Name
+       bucketName := evt_data.ObjectStoreBucket
+       compresion := evt_data.Compression
+       reqParams := make(url.Values)
+
+       xmlh, err := jsoniter.Marshal(evt_data)
+       if err != nil {
+               fmt.Printf("Error: %s", err)
+               return ""
+       }
+
+       // Generate presigned GET url with lambda function
+       presignedURL, err := s3Client.PresignedGetObject(context.Background(), bucketName, objectName, expiry, reqParams)
+       if err != nil {
+               log.Fatalln(err)
+       }
+       file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
+       newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
+       var buf bytes.Buffer
+       err = gzipWrite(&buf, &file_bytes)
+       upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
+       fmt.Println("")
+
+       return newObjectName
+}
+
+func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
+       contentType := "application/json"
+       if strings.HasSuffix(objectName, ".gz") {
+               contentType = "application/gzip"
+       }
+
+       // Upload the xml file with PutObject
+       r := bytes.NewReader(b)
+       tctx := context.Background()
+       if check_minio_bucket(mc, fsbucket) == false {
+               err := create_minio_bucket(mc, fsbucket)
+               if err != nil {
+                       log.Error("Cannot create bucket: ", fsbucket, ", ", err)
+                       return
+               }
+       }
+       ok := false
+       for i := 1; i < 64 && ok == false; i = i * 2 {
+               info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
+               if err != nil {
+
+                       if i == 1 {
+                               log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
+                       } else {
+                               log.Warn("Cannot upload (retry): ", objectName, ", ", err)
+                       }
+                       time.Sleep(time.Duration(i) * time.Second)
+               } else {
+                       log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
+               }
+       }
+}
+
+func create_minio_bucket(mc *minio.Client, bucket string) error {
+       tctx := context.Background()
+       err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
+       if err != nil {
+               // Check to see if we already own this bucket (which happens if you run this twice)
+               exists, errBucketExists := mc.BucketExists(tctx, bucket)
+               if errBucketExists == nil && exists {
+                       log.Debug("Already own bucket:", bucket)
+                       return nil
+               } else {
+                       log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
+                       return err
+               }
+       }
+       log.Debug("Successfully created bucket: ", bucket)
+       return nil
+}
+
+func check_minio_bucket(mc *minio.Client, bucket string) bool {
+       tctx := context.Background()
+       exists, err := mc.BucketExists(tctx, bucket)
+       if err == nil && exists {
+               log.Debug("Already own bucket:", bucket)
+               return true
+       }
+       log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
+       return false
+}
+
+// Write gzipped data to a Writer
+func gzipWrite(w io.Writer, data *[]byte) error {
+       gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
+
+       if err1 != nil {
+               return err1
+       }
+       defer gw.Close()
+       _, err2 := gw.Write(*data)
+       return err2
+}
diff --git a/pm-file-converter/components/xmltransform/xmltransform.go b/pm-file-converter/components/xmltransform/xmltransform.go
new file mode 100644 (file)
index 0000000..8e88d19
--- /dev/null
@@ -0,0 +1,142 @@
+// -
+//
+//      ========================LICENSE_START=================================
+//      O-RAN-SC
+//      %%
+//      Copyright (C) 2023: Nordix Foundation
+//      %%
+//      Licensed under the Apache License, Version 2.0 (the "License");
+//      you may not use this file except in compliance with the License.
+//      You may obtain a copy of the License at
+//
+//           http://www.apache.org/licenses/LICENSE-2.0
+//
+//      Unless required by applicable law or agreed to in writing, software
+//      distributed under the License is distributed on an "AS IS" BASIS,
+//      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//      See the License for the specific language governing permissions and
+//      limitations under the License.
+//      ========================LICENSE_END===================================
+
+package xmltransform
+
+import (
+       "bytes"
+       "compress/gzip"
+       "encoding/xml"
+       "errors"
+       "fmt"
+       jsoniter "github.com/json-iterator/go"
+       log "github.com/sirupsen/logrus"
+       "io"
+       "main/common/dataTypes"
+       "net/http"
+       "strconv"
+       "time"
+)
+
+func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) {
+       var f dataTypes.MeasCollecFile
+       start := time.Now()
+       err := xml.Unmarshal(*f_byteValue, &f)
+       if err != nil {
+               return nil, errors.New("Cannot unmarshal xml-file")
+       }
+       log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
+
+       start = time.Now()
+       var pmfile dataTypes.PMJsonFile
+
+       //TODO: Fill in more values
+       pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
+       pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
+       pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
+       pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
+       pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
+
+       for _, it := range f.MeasData.MeasInfo {
+               var mili dataTypes.MeasInfoList
+               mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
+               for _, jt := range it.MeasType {
+                       mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
+               }
+               for _, jt := range it.MeasValue {
+                       var mv dataTypes.MeasValues
+                       mv.MeasObjInstID = jt.MeasObjLdn
+                       mv.SuspectFlag = jt.Suspect
+                       if jt.Suspect == "" {
+                               mv.SuspectFlag = "false"
+                       }
+                       for _, kt := range jt.R {
+                               ni, _ := strconv.Atoi(kt.P)
+                               nv := kt.Text
+                               mr := dataTypes.MeasResults{ni, nv}
+                               mv.MeasResultsList = append(mv.MeasResultsList, mr)
+                       }
+                       mili.MeasValuesList = append(mili.MeasValuesList, mv)
+               }
+
+               pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
+       }
+
+       pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
+
+       //TODO: Fill more values
+       pmfile.Event.CommonEventHeader.Domain = ""    //xfeh.Domain
+       pmfile.Event.CommonEventHeader.EventID = ""   //xfeh.EventID
+       pmfile.Event.CommonEventHeader.Sequence = 0   //xfeh.Sequence
+       pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
+       pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
+       pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
+       pmfile.Event.CommonEventHeader.Priority = ""            //xfeh.Priority
+       pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
+       pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
+       pmfile.Event.CommonEventHeader.Version = ""                 //xfeh.Version
+       pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
+       pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
+
+       log.Debug("Convert xml to json : ", time.Since(start).String())
+
+       start = time.Now()
+       json, err := jsoniter.Marshal(pmfile)
+       log.Debug("Marshal json : ", time.Since(start).String())
+
+       if err != nil {
+               return nil, errors.New("Cannot marshal converted json")
+       }
+       return json, nil
+}
+
+func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte {
+       evt_data := dataTypes.XmlFileEventHeader{}
+       jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evt_data)
+
+       client := new(http.Client)
+
+       request, err := http.NewRequest("GET", inputS3Url, nil)
+       request.Header.Add("Accept-Encoding", "gzip")
+
+       response, err := client.Do(request)
+       defer response.Body.Close()
+
+       // Check that the server actually sent compressed data
+       var reader io.ReadCloser
+       switch compression {
+       case "gzip", "gz":
+               reader, err = gzip.NewReader(response.Body)
+               defer reader.Close()
+       default:
+               reader = response.Body
+       }
+
+       var buf3 bytes.Buffer
+       _, err2 := io.Copy(&buf3, reader)
+       if err2 != nil {
+               log.Error("Error reading response, discarding message, ", err)
+               return nil
+       }
+       file_bytes := buf3.Bytes()
+       fmt.Println("Converting to XML")
+       b, err := xml_to_json_conv(&file_bytes, &evt_data)
+       return b
+}
index d37b0d2..b931a2a 100644 (file)
-//  ============LICENSE_START===============================================
-//  Copyright (C) 2023 Nordix Foundation. All rights reserved.
-//  ========================================================================
-//  Licensed under the Apache License, Version 2.0 (the "License");
-//  you may not use this file except in compliance with the License.
-//  You may obtain a copy of the License at
+// -
 //
-//       http://www.apache.org/licenses/LICENSE-2.0
+//     ========================LICENSE_START=================================
+//     O-RAN-SC
+//     %%
+//     Copyright (C) 2023: Nordix Foundation
+//     %%
+//     Licensed under the Apache License, Version 2.0 (the "License");
+//     you may not use this file except in compliance with the License.
+//     You may obtain a copy of the License at
 //
-//  Unless required by applicable law or agreed to in writing, software
-//  distributed under the License is distributed on an "AS IS" BASIS,
-//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//  See the License for the specific language governing permissions and
-//  limitations under the License.
-//  ============LICENSE_END=================================================
+//          http://www.apache.org/licenses/LICENSE-2.0
 //
-
+//     Unless required by applicable law or agreed to in writing, software
+//     distributed under the License is distributed on an "AS IS" BASIS,
+//     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//     See the License for the specific language governing permissions and
+//     limitations under the License.
+//     ========================LICENSE_END===================================
 package main
 
 import (
-       "bytes"
-       "compress/gzip"
-       "context"
-       "crypto/tls"
-       "encoding/json"
-       "encoding/xml"
-       "errors"
        "fmt"
-       "io"
-       "net"
-       "os/signal"
-       "reflect"
-       "strings"
-       "sync"
-       "syscall"
-
+       jsoniter "github.com/json-iterator/go"
+       log "github.com/sirupsen/logrus"
+       "main/common/dataTypes"
+       "main/common/utils"
+       "main/components/kafkacollector"
        "net/http"
        "os"
+       "os/signal"
        "runtime"
-       "strconv"
+       "sync"
+       "syscall"
        "time"
+)
 
-       "github.com/google/uuid"
-       "golang.org/x/oauth2/clientcredentials"
-
-       log "github.com/sirupsen/logrus"
-
-       "github.com/gorilla/mux"
-
-       "net/http/pprof"
+var ics_server = os.Getenv("ICS")
+var self = os.Getenv("SELF")
 
-       "github.com/confluentinc/confluent-kafka-go/kafka"
-       influxdb2 "github.com/influxdata/influxdb-client-go/v2"
-       jsoniter "github.com/json-iterator/go"
-       "github.com/minio/minio-go/v7"
-       "github.com/minio/minio-go/v7/pkg/credentials"
-)
+// This are optional - set if using SASL protocol is used towards kafka
+var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
 
-//== Constants ==//
+var bootstrapserver = os.Getenv("KAFKA_SERVER")
 
-const http_port = 80
-const https_port = 443
 const config_file = "application_configuration.json"
-const server_crt = "server.crt"
-const server_key = "server.key"
-
 const producer_name = "kafka-producer"
 
-const registration_delay_short = 2
-const registration_delay_long = 120
-
-const mutexLocked = 1
-
-const (
-       Init AppStates = iota
-       Running
-       Terminating
-)
+var producer_instance_name string = producer_name
 
 const reader_queue_length = 100 //Per type job
 const writer_queue_length = 100 //Per info job
-const parallelism_limiter = 100 //For all jobs
-
-// This are optional - set if using SASL protocol is used towards kafka
-var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
-var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
-var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
-var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
-
-//== Types ==//
-
-type AppStates int64
-
-type FilterParameters struct {
-       MeasuredEntityDns []string `json:"measuredEntityDns"`
-       MeasTypes         []string `json:"measTypes"`
-       MeasObjClass      []string `json:"measObjClass"`
-       MeasObjInstIds    []string `json:"measObjInstIds"`
-}
-
-type InfoJobDataType struct {
-       InfoJobData struct {
-               KafkaOutputTopic string `json:"kafkaOutputTopic"`
-
-               DbUrl    string `json:"db-url"`
-               DbOrg    string `json:"db-org"`
-               DbBucket string `json:"db-bucket"`
-               DbToken  string `json:"db-token"`
-
-               FilterParams FilterParameters `json:"filter"`
-       } `json:"info_job_data"`
-       InfoJobIdentity  string `json:"info_job_identity"`
-       InfoTypeIdentity string `json:"info_type_identity"`
-       LastUpdated      string `json:"last_updated"`
-       Owner            string `json:"owner"`
-       TargetURI        string `json:"target_uri"`
-}
-
-// Type for an infojob
-type InfoJobRecord struct {
-       job_info     InfoJobDataType
-       output_topic string
-
-       statistics *InfoJobStats
-}
-
-// Type for an infojob
-type TypeJobRecord struct {
-       InfoType        string
-       InputTopic      string
-       data_in_channel chan *KafkaPayload
-       reader_control  chan ReaderControl
-       job_control     chan JobControl
-       groupId         string
-       clientId        string
-
-       statistics *TypeJobStats
-}
-
-// Type for controlling the topic reader
-type ReaderControl struct {
-       command string
-}
-
-// Type for controlling the topic writer
-type WriterControl struct {
-       command string
-}
-
-// Type for controlling the job
-type JobControl struct {
-       command string
-       filter  Filter
-}
-
-type KafkaPayload struct {
-       msg   *kafka.Message
-       topic string
-       jobid string
-}
-
-type FilterMaps struct {
-       sourceNameMap     map[string]bool
-       measObjClassMap   map[string]bool
-       measObjInstIdsMap map[string]bool
-       measTypesMap      map[string]bool
-}
-
-type InfluxJobParameters struct {
-       DbUrl    string
-       DbOrg    string
-       DbBucket string
-       DbToken  string
-}
-
-type Filter struct {
-       JobId       string
-       OutputTopic string
-       filter      FilterMaps
-
-       influxParameters InfluxJobParameters
-}
 
-// Type for info job statistics
-type InfoJobStats struct {
-       out_msg_cnt  int
-       out_data_vol int64
-}
-
-// Type for type job statistics
-type TypeJobStats struct {
-       in_msg_cnt  int
-       in_data_vol int64
-}
-
-// == API Datatypes ==//
-// Type for supported data types
-type DataType struct {
-       ID                 string `json:"id"`
-       KafkaInputTopic    string `json:"kafkaInputTopic"`
-       InputJobType       string `json:inputJobType`
-       InputJobDefinition struct {
-               KafkaOutputTopic string `json:kafkaOutputTopic`
-       } `json:inputJobDefinition`
-
-       ext_job         *[]byte
-       ext_job_created bool
-       ext_job_id      string
-}
-
-type DataTypes struct {
-       ProdDataTypes []DataType `json:"types"`
-}
-
-type Minio_buckets struct {
-       Buckets map[string]bool
-}
-
-//== External data types ==//
-
-// // Data type for event xml file download
-type XmlFileEventHeader struct {
-       ProductName        string `json:"productName"`
-       VendorName         string `json:"vendorName"`
-       Location           string `json:"location"`
-       Compression        string `json:"compression"`
-       SourceName         string `json:"sourceName"`
-       FileFormatType     string `json:"fileFormatType"`
-       FileFormatVersion  string `json:"fileFormatVersion"`
-       StartEpochMicrosec int64  `json:"startEpochMicrosec"`
-       LastEpochMicrosec  int64  `json:"lastEpochMicrosec"`
-       Name               string `json:"name"`
-       ChangeIdentifier   string `json:"changeIdentifier"`
-       InternalLocation   string `json:"internalLocation"`
-       TimeZoneOffset     string `json:"timeZoneOffset"`
-       //ObjectStoreBucket  string `json:"objectStoreBucket"`
-}
-
-// Data types for input xml file
-type MeasCollecFile struct {
-       XMLName        xml.Name `xml:"measCollecFile"`
-       Text           string   `xml:",chardata"`
-       Xmlns          string   `xml:"xmlns,attr"`
-       Xsi            string   `xml:"xsi,attr"`
-       SchemaLocation string   `xml:"schemaLocation,attr"`
-       FileHeader     struct {
-               Text              string `xml:",chardata"`
-               FileFormatVersion string `xml:"fileFormatVersion,attr"`
-               VendorName        string `xml:"vendorName,attr"`
-               DnPrefix          string `xml:"dnPrefix,attr"`
-               FileSender        struct {
-                       Text        string `xml:",chardata"`
-                       LocalDn     string `xml:"localDn,attr"`
-                       ElementType string `xml:"elementType,attr"`
-               } `xml:"fileSender"`
-               MeasCollec struct {
-                       Text      string `xml:",chardata"`
-                       BeginTime string `xml:"beginTime,attr"`
-               } `xml:"measCollec"`
-       } `xml:"fileHeader"`
-       MeasData struct {
-               Text           string `xml:",chardata"`
-               ManagedElement struct {
-                       Text      string `xml:",chardata"`
-                       LocalDn   string `xml:"localDn,attr"`
-                       SwVersion string `xml:"swVersion,attr"`
-               } `xml:"managedElement"`
-               MeasInfo []struct {
-                       Text       string `xml:",chardata"`
-                       MeasInfoId string `xml:"measInfoId,attr"`
-                       Job        struct {
-                               Text  string `xml:",chardata"`
-                               JobId string `xml:"jobId,attr"`
-                       } `xml:"job"`
-                       GranPeriod struct {
-                               Text     string `xml:",chardata"`
-                               Duration string `xml:"duration,attr"`
-                               EndTime  string `xml:"endTime,attr"`
-                       } `xml:"granPeriod"`
-                       RepPeriod struct {
-                               Text     string `xml:",chardata"`
-                               Duration string `xml:"duration,attr"`
-                       } `xml:"repPeriod"`
-                       MeasType []struct {
-                               Text string `xml:",chardata"`
-                               P    string `xml:"p,attr"`
-                       } `xml:"measType"`
-                       MeasValue []struct {
-                               Text       string `xml:",chardata"`
-                               MeasObjLdn string `xml:"measObjLdn,attr"`
-                               R          []struct {
-                                       Text string `xml:",chardata"`
-                                       P    string `xml:"p,attr"`
-                               } `xml:"r"`
-                               Suspect string `xml:"suspect"`
-                       } `xml:"measValue"`
-               } `xml:"measInfo"`
-       } `xml:"measData"`
-       FileFooter struct {
-               Text       string `xml:",chardata"`
-               MeasCollec struct {
-                       Text    string `xml:",chardata"`
-                       EndTime string `xml:"endTime,attr"`
-               } `xml:"measCollec"`
-       } `xml:"fileFooter"`
-}
-
-// Data type for json file
-// Splitted in sevreal part to allow add/remove in lists
-type MeasResults struct {
-       P      int    `json:"p"`
-       SValue string `json:"sValue"`
-}
-
-type MeasValues struct {
-       MeasObjInstID   string        `json:"measObjInstId"`
-       SuspectFlag     string        `json:"suspectFlag"`
-       MeasResultsList []MeasResults `json:"measResults"`
-}
-
-type SMeasTypes struct {
-       SMeasType string `json:"sMeasTypesList"`
-}
-
-type MeasInfoList struct {
-       MeasInfoID struct {
-               SMeasInfoID string `json:"sMeasInfoId"`
-       } `json:"measInfoId"`
-       MeasTypes struct {
-               SMeasTypesList []string `json:"sMeasTypesList"`
-       } `json:"measTypes"`
-       MeasValuesList []MeasValues `json:"measValuesList"`
-}
+var files_volume = os.Getenv("FILES_VOLUME")
 
-type PMJsonFile struct {
-       Event struct {
-               CommonEventHeader struct {
-                       Domain                  string `json:"domain"`
-                       EventID                 string `json:"eventId"`
-                       Sequence                int    `json:"sequence"`
-                       EventName               string `json:"eventName"`
-                       SourceName              string `json:"sourceName"`
-                       ReportingEntityName     string `json:"reportingEntityName"`
-                       Priority                string `json:"priority"`
-                       StartEpochMicrosec      int64  `json:"startEpochMicrosec"`
-                       LastEpochMicrosec       int64  `json:"lastEpochMicrosec"`
-                       Version                 string `json:"version"`
-                       VesEventListenerVersion string `json:"vesEventListenerVersion"`
-                       TimeZoneOffset          string `json:"timeZoneOffset"`
-               } `json:"commonEventHeader"`
-               Perf3GppFields struct {
-                       Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
-                       MeasDataCollection    struct {
-                               GranularityPeriod             int            `json:"granularityPeriod"`
-                               MeasuredEntityUserName        string         `json:"measuredEntityUserName"`
-                               MeasuredEntityDn              string         `json:"measuredEntityDn"`
-                               MeasuredEntitySoftwareVersion string         `json:"measuredEntitySoftwareVersion"`
-                               SMeasInfoList                 []MeasInfoList `json:"measInfoList"`
-                       } `json:"measDataCollection"`
-               } `json:"perf3gppFields"`
-       } `json:"event"`
-}
+var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
+var writer_control = make(chan dataTypes.WriterControl, 1)
 
-// Data type for converted json file message
-type FileDownloadedEvt struct {
-       Filename string `json:"filename"`
-}
+const registration_delay_short = 2
+const registration_delay_long = 120
 
 //== Variables ==//
 
@@ -373,37 +66,11 @@ var AppState = Init
 // Lock for all internal data
 var datalock sync.Mutex
 
-var producer_instance_name string = producer_name
-
-// Keep all info type jobs, key == type id
-var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
-
-// Keep all info jobs, key == job id
-var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
-
-var InfoTypes DataTypes
-
-// Limiter - valid for all jobs
-var jobLimiterChan = make(chan struct{}, parallelism_limiter)
-
-// TODO: Config param?
-var bucket_location = "swe"
-
-var httpclient = &http.Client{}
-
-// == Env variables ==//
-var bootstrapserver = os.Getenv("KAFKA_SERVER")
-var files_volume = os.Getenv("FILES_VOLUME")
-var ics_server = os.Getenv("ICS")
-var self = os.Getenv("SELF")
-var filestore_user = os.Getenv("FILESTORE_USER")
-var filestore_pwd = os.Getenv("FILESTORE_PWD")
-var filestore_server = os.Getenv("FILESTORE_SERVER")
-
-var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
-var writer_control = make(chan WriterControl, 1)
-
-var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
+const (
+       Init dataTypes.AppStates = iota
+       Running
+       Terminating
+)
 
 // == Main ==//
 func main() {
@@ -426,55 +93,7 @@ func main() {
                producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
        }
 
-       rtr := mux.NewRouter()
-       rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
-       rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
-       rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
-       rtr.HandleFunc("/statistics", statistics)
-       rtr.HandleFunc("/logging/{level}", logging_level)
-       rtr.HandleFunc("/logging", logging_level)
-       rtr.HandleFunc("/", alive)
-
-       //For perf/mem profiling
-       rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
-
-       http.Handle("/", rtr)
-
-       http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
-
-       cer, err := tls.LoadX509KeyPair(server_crt, server_key)
-       if err != nil {
-               log.Error("Cannot load key and cert - ", err)
-               return
-       }
-       config := &tls.Config{Certificates: []tls.Certificate{cer}}
-       https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
-
-       // Run http
-       go func() {
-               log.Info("Starting http service...")
-               err := http_server.ListenAndServe()
-               if err == http.ErrServerClosed { // graceful shutdown
-                       log.Info("http server shutdown...")
-               } else if err != nil {
-                       log.Error("http server error: ", err)
-               }
-       }()
-
-       //  Run https
-       go func() {
-               log.Info("Starting https service...")
-               err := https_server.ListenAndServe()
-               if err == http.ErrServerClosed { // graceful shutdown
-                       log.Info("https server shutdown...")
-               } else if err != nil {
-                       log.Error("https server error: ", err)
-               }
-       }()
-       check_tcp(strconv.Itoa(http_port))
-       check_tcp(strconv.Itoa(https_port))
-
-       go start_topic_writer(writer_control, data_out_channel)
+       go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
 
        //Setup proc for periodic type registration
        var event_chan = make(chan int) //Channel for stopping the proc
@@ -490,17 +109,6 @@ func main() {
                datalock.Lock()
                defer datalock.Unlock()
                AppState = Terminating
-               http_server.Shutdown(context.Background())
-               https_server.Shutdown(context.Background())
-               // Stopping jobs
-               for key, _ := range TypeJobs {
-                       log.Info("Stopping type job:", key)
-                       for _, dp := range InfoTypes.ProdDataTypes {
-                               if key == dp.ID {
-                                       remove_type_job(dp)
-                               }
-                       }
-               }
        }()
 
        AppState = Running
@@ -512,28 +120,7 @@ func main() {
        fmt.Println("server stopped")
 }
 
-func check_tcp(port string) {
-       log.Info("Checking tcp port: ", port)
-       for true {
-               address := net.JoinHostPort("localhost", port)
-               // 3 second timeout
-               conn, err := net.DialTimeout("tcp", address, 3*time.Second)
-               if err != nil {
-                       log.Info("Checking tcp port: ", port, " failed, retrying...")
-               } else {
-                       if conn != nil {
-                               log.Info("Checking tcp port: ", port, " - OK")
-                               _ = conn.Close()
-                               return
-                       } else {
-                               log.Info("Checking tcp port: ", port, " failed, retrying...")
-                       }
-               }
-       }
-}
-
-//== Core functions ==//
-
+// == Core functions ==//
 // Run periodic registration of producers
 func periodic_registration(evtch chan int) {
        var delay int = 1
@@ -568,7 +155,7 @@ func register_producer() bool {
                log.Error("Registering producer: ", producer_instance_name, " - failed")
                return false
        }
-       data := DataTypes{}
+       data := dataTypes.DataTypes{}
        err = jsoniter.Unmarshal([]byte(file), &data)
        if err != nil {
                log.Error("Cannot parse config file: ", config_file)
@@ -588,13 +175,13 @@ func register_producer() bool {
 
                t1["info_job_data_schema"] = t2
 
-               json, err := json.Marshal(t1)
+               json, err := jsoniter.Marshal(t1)
                if err != nil {
                        log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
                        log.Error("Registering producer: ", producer_instance_name, " - failed")
                        return false
                } else {
-                       ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
+                       ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
                        if !ok {
                                log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
                                log.Error("Registering producer: ", producer_instance_name, " - failed")
@@ -606,1857 +193,42 @@ func register_producer() bool {
        }
 
        log.Debug("Registering types: ", new_type_names)
-       m := make(map[string]interface{})
-       m["supported_info_types"] = new_type_names
-       m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
-       m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
-
-       json, err := json.Marshal(m)
-       if err != nil {
-               log.Error("Cannot create json for producer: ", producer_instance_name)
-               log.Error("Registering producer: ", producer_instance_name, " - failed")
-               return false
-       }
-       ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
-       if !ok {
-               log.Error("Cannot register producer: ", producer_instance_name)
-               log.Error("Registering producer: ", producer_instance_name, " - failed")
-               return false
-       }
        datalock.Lock()
        defer datalock.Unlock()
 
-       var current_type_names []string
-       for _, v := range InfoTypes.ProdDataTypes {
-               current_type_names = append(current_type_names, v.ID)
-               if contains_str(new_type_names, v.ID) {
-                       //Type exist
-                       log.Debug("Type ", v.ID, " exists")
-                       create_ext_job(v)
-               } else {
-                       //Type is removed
-                       log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
-                       remove_type_job(v)
-               }
-       }
-
        for _, v := range data.ProdDataTypes {
-               if contains_str(current_type_names, v.ID) {
-                       //Type exist
-                       log.Debug("Type ", v.ID, " exists")
-                       create_ext_job(v)
-               } else {
-                       //Type is new
-                       log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
-                       start_type_job(v)
-               }
+               log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
+               start_type_job(v)
        }
 
-       InfoTypes = data
-       log.Debug("Datatypes: ", InfoTypes)
-
+       dataTypes.InfoTypes = data
+       log.Debug("Datatypes: ", dataTypes.InfoTypes)
        log.Info("Registering producer: ", producer_instance_name, " - OK")
        return true
 }
 
-func remove_type_job(dp DataType) {
-       log.Info("Removing type job: ", dp.ID)
-       j, ok := TypeJobs[dp.ID]
-       if ok {
-               j.reader_control <- ReaderControl{"EXIT"}
-       }
-
-       if dp.ext_job_created == true {
-               dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
-               ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
-               if !ok {
-                       log.Error("Cannot delete job: ", dp.ext_job_id)
-               }
-               dp.ext_job_created = false
-               dp.ext_job = nil
-       }
-
-}
-
-func start_type_job(dp DataType) {
+func start_type_job(dp dataTypes.DataType) {
        log.Info("Starting type job: ", dp.ID)
-       job_record := TypeJobRecord{}
+       job_record := dataTypes.TypeJobRecord{}
 
-       job_record.job_control = make(chan JobControl, 1)
-       job_record.reader_control = make(chan ReaderControl, 1)
-       job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
+       job_record.Job_control = make(chan dataTypes.JobControl, 1)
+       job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
+       job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
        job_record.InfoType = dp.ID
        job_record.InputTopic = dp.KafkaInputTopic
-       job_record.groupId = "kafka-procon-" + dp.ID
-       job_record.clientId = dp.ID + "-" + os.Getenv("KP")
-       var stats TypeJobStats
-       job_record.statistics = &stats
+       job_record.GroupId = "kafka-procon-" + dp.ID
+       job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
 
        switch dp.ID {
        case "xml-file-data-to-filestore":
-               go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
+               go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
        case "xml-file-data":
-               go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
-       case "json-file-data-from-filestore":
-               go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
-       case "json-file-data":
-               go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
-       case "json-file-data-from-filestore-to-influx":
-               go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
-
+               go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
        default:
        }
 
-       go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
+       go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
 
-       TypeJobs[dp.ID] = job_record
+       dataTypes.TypeJobs[dp.ID] = job_record
        log.Debug("Type job input type: ", dp.InputJobType)
-       create_ext_job(dp)
-}
-
-func create_ext_job(dp DataType) {
-       if dp.InputJobType != "" {
-               jb := make(map[string]interface{})
-               jb["info_type_id"] = dp.InputJobType
-               jb["job_owner"] = "console" //TODO:
-               jb["status_notification_uri"] = "http://callback:80/post"
-               jb1 := make(map[string]interface{})
-               jb["job_definition"] = jb1
-               jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
-
-               json, err := json.Marshal(jb)
-               dp.ext_job_created = false
-               dp.ext_job = nil
-               if err != nil {
-                       log.Error("Cannot create json for type: ", dp.InputJobType)
-                       return
-               }
-
-               dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
-               ok := false
-               for !ok {
-                       ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
-                       if !ok {
-                               log.Error("Cannot register job: ", dp.InputJobType)
-                       }
-               }
-               log.Debug("Registered job ok: ", dp.InputJobType)
-               dp.ext_job_created = true
-               dp.ext_job = &json
-       }
-}
-
-func remove_info_job(jobid string) {
-       log.Info("Removing info job: ", jobid)
-       filter := Filter{}
-       filter.JobId = jobid
-       jc := JobControl{}
-       jc.command = "REMOVE-FILTER"
-       jc.filter = filter
-       infoJob := InfoJobs[jobid]
-       typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
-       typeJob.job_control <- jc
-       delete(InfoJobs, jobid)
-
-}
-
-// == Helper functions ==//
-
-// Function to check the status of a mutex lock
-func MutexLocked(m *sync.Mutex) bool {
-       state := reflect.ValueOf(m).Elem().FieldByName("state")
-       return state.Int()&mutexLocked == mutexLocked
-}
-
-// Test if slice contains a string
-func contains_str(s []string, e string) bool {
-       for _, a := range s {
-               if a == e {
-                       return true
-               }
-       }
-       return false
-}
-
-// Send a http request with json (json may be nil)
-func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
-
-       // set the HTTP method, url, and request body
-       var req *http.Request
-       var err error
-       if json == nil {
-               req, err = http.NewRequest(method, url, http.NoBody)
-       } else {
-               req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
-               req.Header.Set("Content-Type", "application/json; charset=utf-8")
-       }
-       if err != nil {
-               log.Error("Cannot create http request, method: ", method, " url: ", url)
-               return false
-       }
-
-       if useAuth {
-               token, err := fetch_token()
-               if err != nil {
-                       log.Error("Cannot fetch token for http request: ", err)
-                       return false
-               }
-               req.Header.Set("Authorization", "Bearer "+token.TokenValue)
-       }
-
-       log.Debug("HTTP request: ", req)
-
-       log.Debug("Sending http request")
-       resp, err2 := httpclient.Do(req)
-       if err2 != nil {
-               log.Error("Http request error: ", err2)
-               log.Error("Cannot send http request method: ", method, " url: ", url)
-       } else {
-               if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
-                       log.Debug("Accepted http status: ", resp.StatusCode)
-                       resp.Body.Close()
-                       return true
-               }
-               log.Debug("HTTP resp: ", resp)
-               resp.Body.Close()
-       }
-       return false
-}
-
-func fetch_token() (*kafka.OAuthBearerToken, error) {
-       log.Debug("Get token inline")
-       conf := &clientcredentials.Config{
-               ClientID:     creds_client_id,
-               ClientSecret: creds_client_secret,
-               TokenURL:     creds_service_url,
-       }
-       token, err := conf.Token(context.Background())
-       if err != nil {
-               log.Warning("Cannot fetch access token: ", err)
-               return nil, err
-       }
-       extensions := map[string]string{}
-       log.Debug("=====================================================")
-       log.Debug("token: ", token)
-       log.Debug("=====================================================")
-       log.Debug("TokenValue: ", token.AccessToken)
-       log.Debug("=====================================================")
-       log.Debug("Expiration: ", token.Expiry)
-       t := token.Expiry
-       oauthBearerToken := kafka.OAuthBearerToken{
-               TokenValue: token.AccessToken,
-               Expiration: t,
-               Extensions: extensions,
-       }
-
-       return &oauthBearerToken, nil
-}
-
-// Function to print memory details
-// https://pkg.go.dev/runtime#MemStats
-func PrintMemUsage() {
-       if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
-               var m runtime.MemStats
-               runtime.ReadMemStats(&m)
-               fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
-               fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
-               fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
-               fmt.Printf("\tNumGC = %v\n", m.NumGC)
-               fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
-               fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
-               fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
-               fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
-               fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
-               fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
-               fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
-       }
-}
-
-func bToMb(b uint64) uint64 {
-       return b / 1024 / 1024
-}
-
-func generate_uuid_from_type(s string) string {
-       if len(s) > 16 {
-               s = s[:16]
-       }
-       for len(s) < 16 {
-               s = s + "0"
-       }
-       b := []byte(s)
-       b = b[:16]
-       uuid, _ := uuid.FromBytes(b)
-       return uuid.String()
-}
-
-// Write gzipped data to a Writer
-func gzipWrite(w io.Writer, data *[]byte) error {
-       gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
-
-       if err1 != nil {
-               return err1
-       }
-       defer gw.Close()
-       _, err2 := gw.Write(*data)
-       return err2
-}
-
-// Write gunzipped data from Reader to a Writer
-func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
-       gr, err1 := gzip.NewReader(data)
-
-       if err1 != nil {
-               return err1
-       }
-       defer gr.Close()
-       data2, err2 := io.ReadAll(gr)
-       if err2 != nil {
-               return err2
-       }
-       _, err3 := w.Write(data2)
-       if err3 != nil {
-               return err3
-       }
-       return nil
-}
-
-func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
-       tctx := context.Background()
-       err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
-       if err != nil {
-               // Check to see if we already own this bucket (which happens if you run this twice)
-               exists, errBucketExists := mc.BucketExists(tctx, bucket)
-               if errBucketExists == nil && exists {
-                       log.Debug("Already own bucket:", bucket)
-                       add_bucket(client_id, bucket)
-                       return nil
-               } else {
-                       log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
-                       return err
-               }
-       }
-       log.Debug("Successfully created bucket: ", bucket)
-       add_bucket(client_id, bucket)
-       return nil
-}
-
-func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
-       ok := bucket_exist(client_id, bucket)
-       if ok {
-               return true
-       }
-       tctx := context.Background()
-       exists, err := mc.BucketExists(tctx, bucket)
-       if err == nil && exists {
-               log.Debug("Already own bucket:", bucket)
-               return true
-       }
-       log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
-       return false
-}
-
-func add_bucket(minio_id string, bucket string) {
-       datalock.Lock()
-       defer datalock.Unlock()
-
-       b, ok := minio_bucketlist[minio_id]
-       if !ok {
-               b = Minio_buckets{}
-               b.Buckets = make(map[string]bool)
-       }
-       b.Buckets[bucket] = true
-       minio_bucketlist[minio_id] = b
-}
-
-func bucket_exist(minio_id string, bucket string) bool {
-       datalock.Lock()
-       defer datalock.Unlock()
-
-       b, ok := minio_bucketlist[minio_id]
-       if !ok {
-               return false
-       }
-       _, ok = b.Buckets[bucket]
-       return ok
-}
-
-//== http api functions ==//
-
-// create/update job
-func create_job(w http.ResponseWriter, req *http.Request) {
-       log.Debug("Create job, http method: ", req.Method)
-       if req.Method != http.MethodPost {
-               log.Error("Create job, http method not allowed")
-               w.WriteHeader(http.StatusMethodNotAllowed)
-               return
-       }
-       ct := req.Header.Get("Content-Type")
-       if ct != "application/json" {
-               log.Error("Create job, bad content type")
-               http.Error(w, "Bad content type", http.StatusBadRequest)
-               return
-       }
-
-       var t InfoJobDataType
-       err := json.NewDecoder(req.Body).Decode(&t)
-       if err != nil {
-               log.Error("Create job, cannot parse json,", err)
-               http.Error(w, "Cannot parse json", http.StatusBadRequest)
-               return
-       }
-       log.Debug("Creating job, id: ", t.InfoJobIdentity)
-       datalock.Lock()
-       defer datalock.Unlock()
-
-       job_id := t.InfoJobIdentity
-       job_record, job_found := InfoJobs[job_id]
-       type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
-       if !job_found {
-               if !found_type {
-                       log.Error("Type ", t.InfoTypeIdentity, " does not exist")
-                       http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
-                       return
-               }
-       } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
-               log.Error("Job cannot change type")
-               http.Error(w, "Job cannot change type", http.StatusBadRequest)
-               return
-       } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
-               log.Error("Job cannot change topic")
-               http.Error(w, "Job cannot change topic", http.StatusBadRequest)
-               return
-       } else if !found_type {
-               //Should never happen, if the type is removed then job is stopped
-               log.Error("Type ", t.InfoTypeIdentity, " does not exist")
-               http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
-               return
-       }
-
-       if !job_found {
-               job_record = InfoJobRecord{}
-               job_record.job_info = t
-               output_topic := t.InfoJobData.KafkaOutputTopic
-               job_record.output_topic = t.InfoJobData.KafkaOutputTopic
-               log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
-
-               var stats InfoJobStats
-               job_record.statistics = &stats
-
-               filter := Filter{}
-               filter.JobId = job_id
-               filter.OutputTopic = job_record.output_topic
-
-               jc := JobControl{}
-
-               jc.command = "ADD-FILTER"
-
-               if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
-                       fm := FilterMaps{}
-                       fm.sourceNameMap = make(map[string]bool)
-                       fm.measObjClassMap = make(map[string]bool)
-                       fm.measObjInstIdsMap = make(map[string]bool)
-                       fm.measTypesMap = make(map[string]bool)
-                       if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
-                               for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
-                                       fm.sourceNameMap[v] = true
-                               }
-                       }
-                       if t.InfoJobData.FilterParams.MeasObjClass != nil {
-                               for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
-                                       fm.measObjClassMap[v] = true
-                               }
-                       }
-                       if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
-                               for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
-                                       fm.measObjInstIdsMap[v] = true
-                               }
-                       }
-                       if t.InfoJobData.FilterParams.MeasTypes != nil {
-                               for _, v := range t.InfoJobData.FilterParams.MeasTypes {
-                                       fm.measTypesMap[v] = true
-                               }
-                       }
-                       filter.filter = fm
-               }
-               if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
-                       influxparam := InfluxJobParameters{}
-                       influxparam.DbUrl = t.InfoJobData.DbUrl
-                       influxparam.DbOrg = t.InfoJobData.DbOrg
-                       influxparam.DbBucket = t.InfoJobData.DbBucket
-                       influxparam.DbToken = t.InfoJobData.DbToken
-                       filter.influxParameters = influxparam
-               }
-
-               jc.filter = filter
-               InfoJobs[job_id] = job_record
-
-               type_job_record.job_control <- jc
-
-       } else {
-               //TODO
-               //Update job
-       }
-}
-
-// delete job
-func delete_job(w http.ResponseWriter, req *http.Request) {
-       if req.Method != http.MethodDelete {
-               w.WriteHeader(http.StatusMethodNotAllowed)
-               return
-       }
-       datalock.Lock()
-       defer datalock.Unlock()
-
-       vars := mux.Vars(req)
-
-       if id, ok := vars["job_id"]; ok {
-               if _, ok := InfoJobs[id]; ok {
-                       remove_info_job(id)
-                       w.WriteHeader(http.StatusNoContent)
-                       log.Info("Job ", id, " deleted")
-                       return
-               }
-       }
-       w.WriteHeader(http.StatusNotFound)
-}
-
-// job supervision
-func supervise_job(w http.ResponseWriter, req *http.Request) {
-       if req.Method != http.MethodGet {
-               w.WriteHeader(http.StatusMethodNotAllowed)
-               return
-       }
-       datalock.Lock()
-       defer datalock.Unlock()
-
-       vars := mux.Vars(req)
-
-       log.Debug("Supervising, job: ", vars["job_id"])
-       if id, ok := vars["job_id"]; ok {
-               if _, ok := InfoJobs[id]; ok {
-                       log.Debug("Supervision ok, job", id)
-                       return
-               }
-       }
-       w.WriteHeader(http.StatusNotFound)
-}
-
-// producer supervision
-func supervise_producer(w http.ResponseWriter, req *http.Request) {
-       if req.Method != http.MethodGet {
-               w.WriteHeader(http.StatusMethodNotAllowed)
-               return
-       }
-
-       w.WriteHeader(http.StatusOK)
-}
-
-// producer statistics, all jobs
-func statistics(w http.ResponseWriter, req *http.Request) {
-       if req.Method != http.MethodGet {
-               w.WriteHeader(http.StatusMethodNotAllowed)
-               return
-       }
-       m := make(map[string]interface{})
-       log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
-       datalock.Lock()
-       defer datalock.Unlock()
-       req.Header.Set("Content-Type", "application/json; charset=utf-8")
-       m["number-of-jobs"] = len(InfoJobs)
-       m["number-of-types"] = len(InfoTypes.ProdDataTypes)
-       qm := make(map[string]interface{})
-       m["jobs"] = qm
-       for key, elem := range InfoJobs {
-               jm := make(map[string]interface{})
-               qm[key] = jm
-               jm["type"] = elem.job_info.InfoTypeIdentity
-               typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
-               jm["groupId"] = typeJob.groupId
-               jm["clientID"] = typeJob.clientId
-               jm["input topic"] = typeJob.InputTopic
-               jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
-               jm["output topic"] = elem.output_topic
-               jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
-               jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
-               jm["msg_out (job)"] = elem.statistics.out_msg_cnt
-
-       }
-       json, err := json.Marshal(m)
-       if err != nil {
-               w.WriteHeader(http.StatusInternalServerError)
-               log.Error("Cannot marshal statistics json")
-               return
-       }
-       _, err = w.Write(json)
-       if err != nil {
-               w.WriteHeader(http.StatusInternalServerError)
-               log.Error("Cannot send statistics json")
-               return
-       }
-}
-
-// Simple alive check
-func alive(w http.ResponseWriter, req *http.Request) {
-       //Alive check
-}
-
-// Get/Set logging level
-func logging_level(w http.ResponseWriter, req *http.Request) {
-       vars := mux.Vars(req)
-       if level, ok := vars["level"]; ok {
-               if req.Method == http.MethodPut {
-                       switch level {
-                       case "trace":
-                               log.SetLevel(log.TraceLevel)
-                       case "debug":
-                               log.SetLevel(log.DebugLevel)
-                       case "info":
-                               log.SetLevel(log.InfoLevel)
-                       case "warn":
-                               log.SetLevel(log.WarnLevel)
-                       case "error":
-                               log.SetLevel(log.ErrorLevel)
-                       case "fatal":
-                               log.SetLevel(log.FatalLevel)
-                       case "panic":
-                               log.SetLevel(log.PanicLevel)
-                       default:
-                               w.WriteHeader(http.StatusNotFound)
-                       }
-               } else {
-                       w.WriteHeader(http.StatusMethodNotAllowed)
-               }
-       } else {
-               if req.Method == http.MethodGet {
-                       msg := "none"
-                       if log.IsLevelEnabled(log.PanicLevel) {
-                               msg = "panic"
-                       } else if log.IsLevelEnabled(log.FatalLevel) {
-                               msg = "fatal"
-                       } else if log.IsLevelEnabled(log.ErrorLevel) {
-                               msg = "error"
-                       } else if log.IsLevelEnabled(log.WarnLevel) {
-                               msg = "warn"
-                       } else if log.IsLevelEnabled(log.InfoLevel) {
-                               msg = "info"
-                       } else if log.IsLevelEnabled(log.DebugLevel) {
-                               msg = "debug"
-                       } else if log.IsLevelEnabled(log.TraceLevel) {
-                               msg = "trace"
-                       }
-                       w.Header().Set("Content-Type", "application/text")
-                       w.Write([]byte(msg))
-               } else {
-                       w.WriteHeader(http.StatusMethodNotAllowed)
-               }
-       }
-}
-
-func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
-
-       log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
-
-       topic_ok := false
-       var c *kafka.Consumer = nil
-       running := true
-
-       for topic_ok == false {
-
-               select {
-               case reader_ctrl := <-control_ch:
-                       if reader_ctrl.command == "EXIT" {
-                               log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                               data_ch <- nil //Signal to job handler
-                               running = false
-                               return
-                       }
-               case <-time.After(1 * time.Second):
-                       if !running {
-                               return
-                       }
-                       if c == nil {
-                               c = create_kafka_consumer(type_id, gid, cid)
-                               if c == nil {
-                                       log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
-                               } else {
-                                       log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
-                               }
-                       }
-                       if c != nil && topic_ok == false {
-                               err := c.SubscribeTopics([]string{topic}, nil)
-                               if err != nil {
-                                       log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying --  error details: ", err)
-                               } else {
-                                       log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
-                                       topic_ok = true
-                               }
-                       }
-               }
-       }
-       log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
-
-       var event_chan = make(chan int)
-       go func() {
-               for {
-                       select {
-                       case evt := <-c.Events():
-                               switch evt.(type) {
-                               case kafka.OAuthBearerTokenRefresh:
-                                       log.Debug("New consumer token needed: ", evt)
-                                       token, err := fetch_token()
-                                       if err != nil {
-                                               log.Warning("Cannot cannot fetch token: ", err)
-                                               c.SetOAuthBearerTokenFailure(err.Error())
-                                       } else {
-                                               setTokenError := c.SetOAuthBearerToken(*token)
-                                               if setTokenError != nil {
-                                                       log.Warning("Cannot cannot set token: ", setTokenError)
-                                                       c.SetOAuthBearerTokenFailure(setTokenError.Error())
-                                               }
-                                       }
-                               default:
-                                       log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
-                               }
-
-                       case msg := <-event_chan:
-                               if msg == 0 {
-                                       return
-                               }
-                       case <-time.After(1 * time.Second):
-                               if !running {
-                                       return
-                               }
-                       }
-               }
-       }()
-
-       go func() {
-               for {
-                       for {
-                               select {
-                               case reader_ctrl := <-control_ch:
-                                       if reader_ctrl.command == "EXIT" {
-                                               event_chan <- 0
-                                               log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                                               data_ch <- nil //Signal to job handler
-                                               defer c.Close()
-                                               return
-                                       }
-                               default:
-
-                                       ev := c.Poll(1000)
-                                       if ev == nil {
-                                               log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic)
-                                               continue
-                                       }
-                                       switch e := ev.(type) {
-                                       case *kafka.Message:
-                                               var kmsg KafkaPayload
-                                               kmsg.msg = e
-
-                                               c.Commit()
-
-                                               data_ch <- &kmsg
-                                               stats.in_msg_cnt++
-                                               log.Debug("Reader msg: ", &kmsg)
-                                               log.Debug("Reader - data_ch ", data_ch)
-                                       case kafka.Error:
-                                               fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
-
-                                       case kafka.OAuthBearerTokenRefresh:
-                                               log.Debug("New consumer token needed: ", ev)
-                                               token, err := fetch_token()
-                                               if err != nil {
-                                                       log.Warning("Cannot cannot fetch token: ", err)
-                                                       c.SetOAuthBearerTokenFailure(err.Error())
-                                               } else {
-                                                       setTokenError := c.SetOAuthBearerToken(*token)
-                                                       if setTokenError != nil {
-                                                               log.Warning("Cannot cannot set token: ", setTokenError)
-                                                               c.SetOAuthBearerTokenFailure(setTokenError.Error())
-                                                       }
-                                               }
-                                       default:
-                                               fmt.Printf("Ignored %v\n", e)
-                                       }
-                               }
-                       }
-               }
-       }()
-}
-
-func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
-
-       var kafka_producer *kafka.Producer
-
-       running := true
-       log.Info("Topic writer starting")
-
-       // Wait for kafka producer to become available - and be prepared to exit the writer
-       for kafka_producer == nil {
-               select {
-               case writer_ctl := <-control_ch:
-                       if writer_ctl.command == "EXIT" {
-                               //ignore cmd
-                       }
-               default:
-                       kafka_producer = start_producer()
-                       if kafka_producer == nil {
-                               log.Debug("Could not start kafka producer - retrying")
-                               time.Sleep(1 * time.Second)
-                       } else {
-                               log.Debug("Kafka producer started")
-                       }
-               }
-       }
-
-       var event_chan = make(chan int)
-       go func() {
-               for {
-                       select {
-                       case evt := <-kafka_producer.Events():
-                               switch evt.(type) {
-                               case *kafka.Message:
-                                       m := evt.(*kafka.Message)
-
-                                       if m.TopicPartition.Error != nil {
-                                               log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
-                                       } else {
-                                               log.Debug("Dumping topic writer event,  message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
-                                       }
-                               case kafka.Error:
-                                       log.Debug("Dumping topic writer event, error: ", evt)
-                               case kafka.OAuthBearerTokenRefresh:
-                                       log.Debug("New producer token needed: ", evt)
-                                       token, err := fetch_token()
-                                       if err != nil {
-                                               log.Warning("Cannot cannot fetch token: ", err)
-                                               kafka_producer.SetOAuthBearerTokenFailure(err.Error())
-                                       } else {
-                                               setTokenError := kafka_producer.SetOAuthBearerToken(*token)
-                                               if setTokenError != nil {
-                                                       log.Warning("Cannot cannot set token: ", setTokenError)
-                                                       kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
-                                               }
-                                       }
-                               default:
-                                       log.Debug("Dumping topic writer event, unknown: ", evt)
-                               }
-
-                       case msg := <-event_chan:
-                               if msg == 0 {
-                                       return
-                               }
-                       case <-time.After(1 * time.Second):
-                               if !running {
-                                       return
-                               }
-                       }
-               }
-       }()
-       go func() {
-               for {
-                       select {
-                       case writer_ctl := <-control_ch:
-                               if writer_ctl.command == "EXIT" {
-                                       // ignore - wait for channel signal
-                               }
-
-                       case kmsg := <-data_ch:
-                               if kmsg == nil {
-                                       event_chan <- 0
-                                       log.Info("Topic writer stopped by channel signal - start_topic_writer")
-                                       defer kafka_producer.Close()
-                                       return
-                               }
-
-                               retries := 10
-                               msg_ok := false
-                               var err error
-                               for retry := 1; retry <= retries && msg_ok == false; retry++ {
-                                       err = kafka_producer.Produce(&kafka.Message{
-                                               TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
-                                               Value:          kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
-
-                                       if err == nil {
-                                               incr_out_msg_cnt(kmsg.jobid)
-                                               msg_ok = true
-                                               log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
-                                       } else {
-                                               log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
-                                               time.Sleep(time.Duration(retry) * time.Second)
-                                       }
-                               }
-                               if !msg_ok {
-                                       log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
-                               }
-                       case <-time.After(1000 * time.Millisecond):
-                               if !running {
-                                       return
-                               }
-                       }
-               }
-       }()
-}
-
-func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
-       var cm kafka.ConfigMap
-       if creds_grant_type == "" {
-               log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
-               cm = kafka.ConfigMap{
-                       "bootstrap.servers":  bootstrapserver,
-                       "group.id":           gid,
-                       "client.id":          cid,
-                       "auto.offset.reset":  "latest",
-                       "enable.auto.commit": false,
-               }
-       } else {
-               log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
-               cm = kafka.ConfigMap{
-                       "bootstrap.servers":  bootstrapserver,
-                       "group.id":           gid,
-                       "client.id":          cid,
-                       "auto.offset.reset":  "latest",
-                       "enable.auto.commit": false,
-                       "sasl.mechanism":     "OAUTHBEARER",
-                       "security.protocol":  "SASL_PLAINTEXT",
-               }
-       }
-       c, err := kafka.NewConsumer(&cm)
-
-       if err != nil {
-               log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
-               return nil
-       }
-
-       log.Info("Created kafka consumer for type: ", type_id, " OK")
-       return c
-}
-
-// Start kafka producer
-func start_producer() *kafka.Producer {
-       log.Info("Creating kafka producer")
-
-       var cm kafka.ConfigMap
-       if creds_grant_type == "" {
-               log.Info("Creating kafka SASL plain text producer")
-               cm = kafka.ConfigMap{
-                       "bootstrap.servers": bootstrapserver,
-               }
-       } else {
-               log.Info("Creating kafka SASL plain text producer")
-               cm = kafka.ConfigMap{
-                       "bootstrap.servers": bootstrapserver,
-                       "sasl.mechanism":    "OAUTHBEARER",
-                       "security.protocol": "SASL_PLAINTEXT",
-               }
-       }
-
-       p, err := kafka.NewProducer(&cm)
-       if err != nil {
-               log.Error("Cannot create kafka producer,", err)
-               return nil
-       }
-       return p
-}
-
-func start_adminclient() *kafka.AdminClient {
-       log.Info("Creating kafka admin client")
-       a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
-       if err != nil {
-               log.Error("Cannot create kafka admin client,", err)
-               return nil
-       }
-       return a
-}
-
-func create_minio_client(id string) (*minio.Client, *error) {
-       log.Debug("Get minio client")
-       minio_client, err := minio.New(filestore_server, &minio.Options{
-               Secure: false,
-               Creds:  credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
-       })
-       if err != nil {
-               log.Error("Cannot create minio client, ", err)
-               return nil, &err
-       }
-       return minio_client, nil
-}
-
-func incr_out_msg_cnt(jobid string) {
-       j, ok := InfoJobs[jobid]
-       if ok {
-               j.statistics.out_msg_cnt++
-       }
-}
-
-func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
-
-       log.Info("Type job", type_id, " started")
-
-       filters := make(map[string]Filter)
-       topic_list := make(map[string]string)
-       var mc *minio.Client
-       const mc_id = "mc_" + "start_job_xml_file_data"
-       running := true
-       for {
-               select {
-               case job_ctl := <-control_ch:
-                       log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
-                       switch job_ctl.command {
-                       case "EXIT":
-                               //ignore cmd - handled by channel signal
-                       case "ADD-FILTER":
-                               filters[job_ctl.filter.JobId] = job_ctl.filter
-                               log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
-
-                               tmp_topic_list := make(map[string]string)
-                               for k, v := range topic_list {
-                                       tmp_topic_list[k] = v
-                               }
-                               tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
-                               topic_list = tmp_topic_list
-                       case "REMOVE-FILTER":
-                               log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
-
-                               tmp_topic_list := make(map[string]string)
-                               for k, v := range topic_list {
-                                       tmp_topic_list[k] = v
-                               }
-                               delete(tmp_topic_list, job_ctl.filter.JobId)
-                               topic_list = tmp_topic_list
-                       }
-
-               case msg := <-data_in_ch:
-                       if msg == nil {
-                               log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
-
-                               running = false
-                               return
-                       }
-                       if fsbucket != "" && fvolume == "" {
-                               if mc == nil {
-                                       var err *error
-                                       mc, err = create_minio_client(mc_id)
-                                       if err != nil {
-                                               log.Debug("Cannot create minio client for type job: ", type_id)
-                                       }
-                               }
-                       }
-                       jobLimiterChan <- struct{}{}
-                       go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
-
-               case <-time.After(1 * time.Second):
-                       if !running {
-                               return
-                       }
-               }
-       }
-}
-
-func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
-       defer func() {
-               <-jobLimiterChan
-       }()
-       PrintMemUsage()
-
-       if fvolume == "" && fsbucket == "" {
-               log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
-               return
-       } else if (fvolume != "") && (fsbucket != "") {
-               log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
-               return
-       }
-
-       start := time.Now()
-       var evt_data XmlFileEventHeader
-
-       err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
-       if err != nil {
-               log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
-               return
-       }
-       log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
-
-       var reader io.Reader
-
-       INPUTBUCKET := "ropfiles"
-
-       filename := ""
-       if fvolume != "" {
-               filename = fvolume + "/" + evt_data.Name
-               fi, err := os.Open(filename)
-
-               if err != nil {
-                       log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
-                       return
-               }
-               defer fi.Close()
-               reader = fi
-       } else {
-               filename = evt_data.Name
-               if mc != nil {
-                       tctx := context.Background()
-                       mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
-                       if err != nil {
-                               log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
-                               return
-                       }
-                       if mr == nil {
-                               log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader -  discarding message, error details: ", err)
-                               return
-                       }
-                       reader = mr
-                       defer mr.Close()
-               } else {
-                       log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client -  discarding message")
-                       return
-               }
-       }
-
-       if reader == nil {
-               log.Error("Cannot get: ", filename, " - null reader")
-               return
-       }
-       var file_bytes []byte
-       if strings.HasSuffix(filename, "gz") {
-               start := time.Now()
-               var buf3 bytes.Buffer
-               errb := gunzipReaderToWriter(&buf3, reader)
-               if errb != nil {
-                       log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
-                       return
-               }
-               file_bytes = buf3.Bytes()
-               log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
-
-       } else {
-               var buf3 bytes.Buffer
-               _, err2 := io.Copy(&buf3, reader)
-               if err2 != nil {
-                       log.Error("File ", filename, " - cannot be read, discarding message, ", err)
-                       return
-               }
-               file_bytes = buf3.Bytes()
-       }
-       start = time.Now()
-       b, err := xml_to_json_conv(&file_bytes, &evt_data)
-       if err != nil {
-               log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
-               return
-       }
-       log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
-
-       new_fn := evt_data.Name + os.Getenv("KP") + ".json"
-       if outputCompression == "gz" {
-               new_fn = new_fn + ".gz"
-               start = time.Now()
-               var buf bytes.Buffer
-               err = gzipWrite(&buf, &b)
-               if err != nil {
-                       log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
-                       return
-               }
-               b = buf.Bytes()
-               log.Debug("Gzip file:  ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
-
-       }
-       start = time.Now()
-
-       if fvolume != "" {
-               //Store on disk
-               err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
-               if err != nil {
-                       log.Error("Cannot write file ", new_fn, " - discarding message,", err)
-                       return
-               }
-               log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
-       } else if fsbucket != "" {
-               // Store in minio
-               objectName := new_fn
-               if mc != nil {
-
-                       contentType := "application/json"
-                       if strings.HasSuffix(objectName, ".gz") {
-                               contentType = "application/gzip"
-                       }
-
-                       // Upload the xml file with PutObject
-                       r := bytes.NewReader(b)
-                       tctx := context.Background()
-                       if check_minio_bucket(mc, mc_id, fsbucket) == false {
-                               err := create_minio_bucket(mc, mc_id, fsbucket)
-                               if err != nil {
-                                       log.Error("Cannot create bucket: ", fsbucket, ", ", err)
-                                       return
-                               }
-                       }
-                       ok := false
-                       for i := 1; i < 64 && ok == false; i = i * 2 {
-                               info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
-                               if err != nil {
-
-                                       if i == 1 {
-                                               log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
-                                       } else {
-                                               log.Warn("Cannot upload (retry): ", objectName, ", ", err)
-                                       }
-                                       time.Sleep(time.Duration(i) * time.Second)
-                               } else {
-                                       log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
-                                       log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
-                                       ok = true
-                               }
-                       }
-                       if !ok {
-                               log.Error("Cannot upload : ", objectName, ", ", err)
-                       }
-               } else {
-                       log.Error("Cannot upload: ", objectName, ", no client")
-               }
-       }
-
-       start = time.Now()
-       if fvolume == "" {
-               var fde FileDownloadedEvt
-               fde.Filename = new_fn
-               j, err := jsoniter.Marshal(fde)
-
-               if err != nil {
-                       log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
-                       return
-               }
-               msg.msg.Value = j
-       } else {
-               var fde FileDownloadedEvt
-               fde.Filename = new_fn
-               j, err := jsoniter.Marshal(fde)
-
-               if err != nil {
-                       log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
-                       return
-               }
-               msg.msg.Value = j
-       }
-       msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
-       log.Debug("Marshal file-collect event ", time.Since(start).String())
-
-       for k, v := range topic_list {
-               var kmsg *KafkaPayload = new(KafkaPayload)
-               kmsg.msg = msg.msg
-               kmsg.topic = v
-               kmsg.jobid = k
-               data_out_channel <- kmsg
-       }
-}
-
-func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
-       var f MeasCollecFile
-       start := time.Now()
-       err := xml.Unmarshal(*f_byteValue, &f)
-       if err != nil {
-               return nil, errors.New("Cannot unmarshal xml-file")
-       }
-       log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
-
-       start = time.Now()
-       var pmfile PMJsonFile
-
-       pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
-       pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
-       pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
-       pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
-       pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
-
-       for _, it := range f.MeasData.MeasInfo {
-               var mili MeasInfoList
-               mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
-               for _, jt := range it.MeasType {
-                       mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
-               }
-               for _, jt := range it.MeasValue {
-                       var mv MeasValues
-                       mv.MeasObjInstID = jt.MeasObjLdn
-                       mv.SuspectFlag = jt.Suspect
-                       if jt.Suspect == "" {
-                               mv.SuspectFlag = "false"
-                       }
-                       for _, kt := range jt.R {
-                               ni, _ := strconv.Atoi(kt.P)
-                               nv := kt.Text
-                               mr := MeasResults{ni, nv}
-                               mv.MeasResultsList = append(mv.MeasResultsList, mr)
-                       }
-                       mili.MeasValuesList = append(mili.MeasValuesList, mv)
-               }
-
-               pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
-       }
-
-       pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
-
-       //TODO: Fill more values
-       pmfile.Event.CommonEventHeader.Domain = ""    //xfeh.Domain
-       pmfile.Event.CommonEventHeader.EventID = ""   //xfeh.EventID
-       pmfile.Event.CommonEventHeader.Sequence = 0   //xfeh.Sequence
-       pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
-       pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
-       pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
-       pmfile.Event.CommonEventHeader.Priority = ""            //xfeh.Priority
-       pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
-       pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
-       pmfile.Event.CommonEventHeader.Version = ""                 //xfeh.Version
-       pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
-       pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
-
-       log.Debug("Convert xml to json : ", time.Since(start).String())
-
-       start = time.Now()
-       json, err := jsoniter.Marshal(pmfile)
-       log.Debug("Marshal json : ", time.Since(start).String())
-
-       if err != nil {
-               return nil, errors.New("Cannot marshal converted json")
-       }
-       return json, nil
-}
-
-func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
-
-       log.Info("Type job", type_id, " started")
-
-       filters := make(map[string]Filter)
-       filterParams_list := make(map[string]FilterMaps)
-       topic_list := make(map[string]string)
-       var mc *minio.Client
-       const mc_id = "mc_" + "start_job_json_file_data"
-       running := true
-       for {
-               select {
-               case job_ctl := <-control_ch:
-                       log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
-                       switch job_ctl.command {
-                       case "EXIT":
-                       case "ADD-FILTER":
-                               filters[job_ctl.filter.JobId] = job_ctl.filter
-                               log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
-
-                               tmp_filterParams_list := make(map[string]FilterMaps)
-                               for k, v := range filterParams_list {
-                                       tmp_filterParams_list[k] = v
-                               }
-                               tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
-                               filterParams_list = tmp_filterParams_list
-
-                               tmp_topic_list := make(map[string]string)
-                               for k, v := range topic_list {
-                                       tmp_topic_list[k] = v
-                               }
-                               tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
-                               topic_list = tmp_topic_list
-                       case "REMOVE-FILTER":
-                               log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
-
-                               tmp_filterParams_list := make(map[string]FilterMaps)
-                               for k, v := range filterParams_list {
-                                       tmp_filterParams_list[k] = v
-                               }
-                               delete(tmp_filterParams_list, job_ctl.filter.JobId)
-                               filterParams_list = tmp_filterParams_list
-
-                               tmp_topic_list := make(map[string]string)
-                               for k, v := range topic_list {
-                                       tmp_topic_list[k] = v
-                               }
-                               delete(tmp_topic_list, job_ctl.filter.JobId)
-                               topic_list = tmp_topic_list
-                       }
-
-               case msg := <-data_in_ch:
-                       if msg == nil {
-                               log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
-
-                               running = false
-                               return
-                       }
-                       if objectstore {
-                               if mc == nil {
-                                       var err *error
-                                       mc, err = create_minio_client(mc_id)
-                                       if err != nil {
-                                               log.Debug("Cannot create minio client for type job: ", type_id)
-                                       }
-                               }
-                       }
-                       //TODO: Sort processed file conversions in order (FIFO)
-                       jobLimiterChan <- struct{}{}
-                       go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
-
-               case <-time.After(1 * time.Second):
-                       if !running {
-                               return
-                       }
-               }
-       }
-}
-
-func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
-
-       //Release job limit
-       defer func() {
-               <-jobLimiterChan
-       }()
-
-       PrintMemUsage()
-
-       var evt_data FileDownloadedEvt
-       err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
-       if err != nil {
-               log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
-               return
-       }
-       log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
-
-       var reader io.Reader
-
-       INPUTBUCKET := "pm-files-json"
-       filename := ""
-       if objectstore == false {
-               filename = files_volume + "/" + evt_data.Filename
-               fi, err := os.Open(filename)
-
-               if err != nil {
-                       log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
-                       return
-               }
-               defer fi.Close()
-               reader = fi
-       } else {
-               filename = "/" + evt_data.Filename
-               if mc != nil {
-                       if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
-                               log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
-                               return
-                       }
-                       tctx := context.Background()
-                       mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
-                       if err != nil {
-                               log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
-                               return
-                       }
-                       reader = mr
-                       defer mr.Close()
-               } else {
-                       log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
-                       return
-               }
-       }
-
-       var data *[]byte
-       if strings.HasSuffix(filename, "gz") {
-               start := time.Now()
-               var buf2 bytes.Buffer
-               errb := gunzipReaderToWriter(&buf2, reader)
-               if errb != nil {
-                       log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
-                       return
-               }
-               d := buf2.Bytes()
-               data = &d
-               log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
-       } else {
-
-               start := time.Now()
-               d, err := io.ReadAll(reader)
-               if err != nil {
-                       log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
-                       return
-               }
-               data = &d
-
-               log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
-       }
-
-       for k, v := range filterList {
-
-               var pmfile PMJsonFile
-               start := time.Now()
-               err = jsoniter.Unmarshal(*data, &pmfile)
-               log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
-               if err != nil {
-                       log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
-                       return
-               }
-
-               var kmsg *KafkaPayload = new(KafkaPayload)
-               kmsg.msg = new(kafka.Message)
-               kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
-               log.Debug("topic:", topic_list[k])
-               log.Debug("sourceNameMap:", v.sourceNameMap)
-               log.Debug("measObjClassMap:", v.measObjClassMap)
-               log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
-               log.Debug("measTypesMap:", v.measTypesMap)
-
-               b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
-               if b == nil {
-                       log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
-                       return
-               }
-               kmsg.msg.Value = *b
-
-               kmsg.topic = topic_list[k]
-               kmsg.jobid = k
-
-               data_out_channel <- kmsg
-       }
-
-}
-
-func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
-
-       if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
-               return nil
-       }
-       start := time.Now()
-       j, err := jsoniter.Marshal(&data)
-
-       log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
-
-       if err != nil {
-               log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
-               return nil
-       }
-
-       log.Debug("Filtered json obj: ", resource, " len: ", len(j))
-       return &j
-}
-
-func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
-       filter_req := true
-       start := time.Now()
-       if len(sourceNameMap) != 0 {
-               if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
-                       filter_req = false
-                       return nil
-               }
-       }
-       if filter_req {
-               modified := false
-               var temp_mil []MeasInfoList
-               for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
-
-                       check_cntr := false
-                       var cnt_flags []bool
-                       if len(measTypesMap) > 0 {
-                               c_cntr := 0
-                               var temp_mtl []string
-                               for _, v := range zz.MeasTypes.SMeasTypesList {
-                                       if measTypesMap[v] {
-                                               cnt_flags = append(cnt_flags, true)
-                                               c_cntr++
-                                               temp_mtl = append(temp_mtl, v)
-                                       } else {
-                                               cnt_flags = append(cnt_flags, false)
-                                       }
-                               }
-                               if c_cntr > 0 {
-                                       check_cntr = true
-                                       zz.MeasTypes.SMeasTypesList = temp_mtl
-                               } else {
-                                       modified = true
-                                       continue
-                               }
-                       }
-                       keep := false
-                       var temp_mvl []MeasValues
-                       for _, yy := range zz.MeasValuesList {
-                               keep_class := false
-                               keep_inst := false
-                               keep_cntr := false
-
-                               dna := strings.Split(yy.MeasObjInstID, ",")
-                               instName := dna[len(dna)-1]
-                               cls := strings.Split(dna[len(dna)-1], "=")[0]
-
-                               if len(measObjClassMap) > 0 {
-                                       if measObjClassMap[cls] {
-                                               keep_class = true
-                                       }
-                               } else {
-                                       keep_class = true
-                               }
-
-                               if len(measObjInstIdsMap) > 0 {
-                                       if measObjInstIdsMap[instName] {
-                                               keep_inst = true
-                                       }
-                               } else {
-                                       keep_inst = true
-                               }
-
-                               if check_cntr {
-                                       var temp_mrl []MeasResults
-                                       cnt_p := 1
-                                       for _, v := range yy.MeasResultsList {
-                                               if cnt_flags[v.P-1] {
-                                                       v.P = cnt_p
-                                                       cnt_p++
-                                                       temp_mrl = append(temp_mrl, v)
-                                               }
-                                       }
-                                       yy.MeasResultsList = temp_mrl
-                                       keep_cntr = true
-                               } else {
-                                       keep_cntr = true
-                               }
-                               if keep_class && keep_cntr && keep_inst {
-                                       keep = true
-                                       temp_mvl = append(temp_mvl, yy)
-                               }
-                       }
-                       if keep {
-                               zz.MeasValuesList = temp_mvl
-                               temp_mil = append(temp_mil, zz)
-                               modified = true
-                       }
-
-               }
-               //Only if modified
-               if modified {
-                       if len(temp_mil) == 0 {
-                               log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
-                               return nil
-                       }
-                       data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
-               }
-       }
-       log.Debug("Filter: ", time.Since(start).String())
-       return data
-}
-
-func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
-
-       log.Info("Type job", type_id, " started")
-       log.Debug("influx job ch ", data_in_ch)
-       filters := make(map[string]Filter)
-       filterParams_list := make(map[string]FilterMaps)
-       influx_job_params := make(map[string]InfluxJobParameters)
-       var mc *minio.Client
-       const mc_id = "mc_" + "start_job_json_file_data_influx"
-       running := true
-       for {
-               select {
-               case job_ctl := <-control_ch:
-                       log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
-                       switch job_ctl.command {
-                       case "EXIT":
-                               //ignore cmd - handled by channel signal
-                       case "ADD-FILTER":
-
-                               filters[job_ctl.filter.JobId] = job_ctl.filter
-                               log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
-                               log.Debug(job_ctl.filter)
-                               tmp_filterParams_list := make(map[string]FilterMaps)
-                               for k, v := range filterParams_list {
-                                       tmp_filterParams_list[k] = v
-                               }
-                               tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
-                               filterParams_list = tmp_filterParams_list
-
-                               tmp_influx_job_params := make(map[string]InfluxJobParameters)
-                               for k, v := range influx_job_params {
-                                       tmp_influx_job_params[k] = v
-                               }
-                               tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
-                               influx_job_params = tmp_influx_job_params
-
-                       case "REMOVE-FILTER":
-
-                               log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
-
-                               tmp_filterParams_list := make(map[string]FilterMaps)
-                               for k, v := range filterParams_list {
-                                       tmp_filterParams_list[k] = v
-                               }
-                               delete(tmp_filterParams_list, job_ctl.filter.JobId)
-                               filterParams_list = tmp_filterParams_list
-
-                               tmp_influx_job_params := make(map[string]InfluxJobParameters)
-                               for k, v := range influx_job_params {
-                                       tmp_influx_job_params[k] = v
-                               }
-                               delete(tmp_influx_job_params, job_ctl.filter.JobId)
-                               influx_job_params = tmp_influx_job_params
-                       }
-
-               case msg := <-data_in_ch:
-                       log.Debug("Data reveived - influx")
-                       if msg == nil {
-                               log.Info("Type job ", type_id, " stopped by channel signal -  start_job_xml_file_data")
-
-                               running = false
-                               return
-                       }
-                       if objectstore {
-                               if mc == nil {
-                                       var err *error
-                                       mc, err = create_minio_client(mc_id)
-                                       if err != nil {
-                                               log.Debug("Cannot create minio client for type job: ", type_id)
-                                       }
-                               }
-                       }
-
-                       jobLimiterChan <- struct{}{}
-                       go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
-
-               case <-time.After(1 * time.Second):
-                       if !running {
-                               return
-                       }
-               }
-       }
-}
-
-func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
-
-       log.Debug("run_json_file_data_job_influx")
-       //Release job limit
-       defer func() {
-               <-jobLimiterChan
-       }()
-
-       PrintMemUsage()
-
-       var evt_data FileDownloadedEvt
-       err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
-       if err != nil {
-               log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
-               return
-       }
-       log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
-
-       var reader io.Reader
-
-       INPUTBUCKET := "pm-files-json"
-       filename := ""
-       if objectstore == false {
-               filename = files_volume + "/" + evt_data.Filename
-               fi, err := os.Open(filename)
-
-               if err != nil {
-                       log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
-                       return
-               }
-               defer fi.Close()
-               reader = fi
-       } else {
-               filename = "/" + evt_data.Filename
-               if mc != nil {
-                       if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
-                               log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
-                               return
-                       }
-                       tctx := context.Background()
-                       mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
-                       if err != nil {
-                               log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
-                               return
-                       }
-                       reader = mr
-                       defer mr.Close()
-               } else {
-                       log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
-                       return
-               }
-       }
-
-       var data *[]byte
-       if strings.HasSuffix(filename, "gz") {
-               start := time.Now()
-               var buf2 bytes.Buffer
-               errb := gunzipReaderToWriter(&buf2, reader)
-               if errb != nil {
-                       log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
-                       return
-               }
-               d := buf2.Bytes()
-               data = &d
-               log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
-       } else {
-
-               start := time.Now()
-               d, err := io.ReadAll(reader)
-               if err != nil {
-                       log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
-                       return
-               }
-               data = &d
-
-               log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
-       }
-       for k, v := range filterList {
-
-               var pmfile PMJsonFile
-               start := time.Now()
-               err = jsoniter.Unmarshal(*data, &pmfile)
-               log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
-               if err != nil {
-                       log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
-                       return
-               }
-
-               if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
-                       b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
-                       if b == nil {
-                               log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
-                               return
-                       }
-
-               }
-               fluxParms := influxList[k]
-               log.Debug("Influxdb params: ", fluxParms)
-               client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
-               writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
-
-               for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
-                       ctr_names := make(map[string]string)
-                       for cni, cn := range zz.MeasTypes.SMeasTypesList {
-                               ctr_names[strconv.Itoa(cni+1)] = cn
-                       }
-                       for _, xx := range zz.MeasValuesList {
-                               log.Debug("Measurement: ", xx.MeasObjInstID)
-                               log.Debug("Suspect flag: ", xx.SuspectFlag)
-                               p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
-                               p.AddField("suspectflag", xx.SuspectFlag)
-                               p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
-                               p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
-                               for _, yy := range xx.MeasResultsList {
-                                       pi := strconv.Itoa(yy.P)
-                                       pv := yy.SValue
-                                       pn := ctr_names[pi]
-                                       log.Debug("Counter: ", pn, " Value: ", pv)
-                                       pv_i, err := strconv.Atoi(pv)
-                                       if err == nil {
-                                               p.AddField(pn, pv_i)
-                                       } else {
-                                               p.AddField(pn, pv)
-                                       }
-                               }
-                               //p.SetTime(timeT)
-                               log.Debug("StartEpochMicrosec from common event header:  ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
-                               log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
-                               p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
-                               err := writeAPI.WritePoint(context.Background(), p)
-                               if err != nil {
-                                       log.Error("Db write error: ", err)
-                               }
-                       }
-
-               }
-               client.Close()
-       }
-
 }