Updated installation and components
[nonrtric/plt/ranpm.git] / pm-file-converter / main.go
index 9c3d8a4..d37b0d2 100644 (file)
@@ -444,13 +444,12 @@ func main() {
 
        cer, err := tls.LoadX509KeyPair(server_crt, server_key)
        if err != nil {
-               log.Error("Cannot load key and cert - %v\n", err)
+               log.Error("Cannot load key and cert - ", err)
                return
        }
        config := &tls.Config{Certificates: []tls.Certificate{cer}}
        https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
 
-       //TODO: Make http on/off configurable
        // Run http
        go func() {
                log.Info("Starting http service...")
@@ -458,11 +457,10 @@ func main() {
                if err == http.ErrServerClosed { // graceful shutdown
                        log.Info("http server shutdown...")
                } else if err != nil {
-                       log.Error("http server error: %v\n", err)
+                       log.Error("http server error: ", err)
                }
        }()
 
-       //TODO: Make https on/off configurable
        //  Run https
        go func() {
                log.Info("Starting https service...")
@@ -470,7 +468,7 @@ func main() {
                if err == http.ErrServerClosed { // graceful shutdown
                        log.Info("https server shutdown...")
                } else if err != nil {
-                       log.Error("https server error: %v\n", err)
+                       log.Error("https server error: ", err)
                }
        }()
        check_tcp(strconv.Itoa(http_port))
@@ -596,7 +594,6 @@ func register_producer() bool {
                        log.Error("Registering producer: ", producer_instance_name, " - failed")
                        return false
                } else {
-                       //TODO: http/https should be configurable
                        ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
                        if !ok {
                                log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
@@ -611,9 +608,7 @@ func register_producer() bool {
        log.Debug("Registering types: ", new_type_names)
        m := make(map[string]interface{})
        m["supported_info_types"] = new_type_names
-       //TODO: http/https should be configurable
        m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
-       //TODO: http/https should be configurable
        m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
 
        json, err := json.Marshal(m)
@@ -673,7 +668,6 @@ func remove_type_job(dp DataType) {
 
        if dp.ext_job_created == true {
                dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
-               //TODO: http/https should be configurable
                ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
                if !ok {
                        log.Error("Cannot delete job: ", dp.ext_job_id)
@@ -709,8 +703,6 @@ func start_type_job(dp DataType) {
                go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
        case "json-file-data-from-filestore-to-influx":
                go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
-       // case "json-data-to-influx":
-       //      go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel)
 
        default:
        }
@@ -741,13 +733,11 @@ func create_ext_job(dp DataType) {
                }
 
                dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
-               //TODO: http/https should be configurable
                ok := false
                for !ok {
                        ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
                        if !ok {
                                log.Error("Cannot register job: ", dp.InputJobType)
-                               //TODO: Restart after long time?
                        }
                }
                log.Debug("Registered job ok: ", dp.InputJobType)
@@ -833,132 +823,6 @@ func send_http_request(json []byte, method string, url string, retry bool, useAu
        return false
 }
 
-// // Send a http request with json (json may be nil)
-// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
-
-//     // set the HTTP method, url, and request body
-//     var req *http.Request
-//     var err error
-//     if json == nil {
-//             req, err = http.NewRequest(method, url, http.NoBody)
-//     } else {
-//             req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
-//             req.Header.Set("Content-Type", "application/json; charset=utf-8")
-//     }
-//     if err != nil {
-//             log.Error("Cannot create http request, method: ", method, " url: ", url)
-//             return false
-//     }
-
-//     if useAuth {
-//             token, err := fetch_token()
-//             if err != nil {
-//                     log.Error("Cannot fetch token for http request: ", err)
-//                     return false
-//             }
-//             req.Header.Set("Authorization", "Bearer "+token.TokenValue)
-//     }
-
-//     log.Debug("HTTP request: ", req)
-
-//     retries := 1
-//     if retry {
-//             retries = 5
-//     }
-//     sleep_time := 1
-//     for i := retries; i > 0; i-- {
-//             log.Debug("Sending http request")
-//             resp, err2 := httpclient.Do(req)
-//             if err2 != nil {
-//                     log.Error("Http request error: ", err2)
-//                     log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1)
-
-//                     time.Sleep(time.Duration(sleep_time) * time.Second)
-//                     sleep_time = 2 * sleep_time
-//             } else {
-//                     if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
-//                             log.Debug("Accepted http status: ", resp.StatusCode)
-//                             resp.Body.Close()
-//                             return true
-//                     }
-//                     log.Debug("HTTP resp: ", resp)
-//                     resp.Body.Close()
-//             }
-//     }
-//     return false
-// }
-
-// // Send a http request with json (json may be nil)
-// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
-//     // initialize http client
-//     client := &http.Client{}
-
-//     // set the HTTP method, url, and request body
-//     var req *http.Request
-//     var err error
-//     if json == nil {
-//             req, err = http.NewRequest(method, url, http.NoBody)
-//     } else {
-//             req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
-//             req.Header.Set("Content-Type", "application/json; charset=utf-8")
-//     }
-//     if err != nil {
-//             log.Error("Cannot create http request method: ", method, " url: ", url)
-//             return false
-//     }
-
-//     useAuth = false
-//     if useAuth {
-//             token, err := fetch_token()
-//             if err != nil {
-//                     log.Error("Cannot fetch token for http request: ", err)
-//                     return false
-//             }
-//             req.Header.Add("Authorization", "Bearer "+token.TokenValue)
-//     }
-//     log.Debug("HTTP request: ", req)
-
-//     b, berr := io.ReadAll(req.Body)
-//     if berr == nil {
-//             log.Debug("HTTP request body length: ", len(b))
-//     } else {
-//             log.Debug("HTTP request - cannot check body length: ", berr)
-//     }
-//     if json == nil {
-//             log.Debug("HTTP request null json")
-//     } else {
-//             log.Debug("HTTP request json: ", string(json))
-//     }
-//     requestDump, cerr := httputil.DumpRequestOut(req, true)
-//     if cerr != nil {
-//             fmt.Println(cerr)
-//     }
-//     fmt.Println(string(requestDump))
-
-//     retries := 1
-//     if retry {
-//             retries = 5
-//     }
-//     sleep_time := 1
-//     for i := retries; i > 0; i-- {
-//             resp, err2 := client.Do(req)
-//             if err2 != nil {
-//                     log.Error("Http request error: ", err2)
-//                     log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i)
-
-//                     time.Sleep(time.Duration(sleep_time) * time.Second)
-//                     sleep_time = 2 * sleep_time
-//             } else {
-//                     if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
-//                             log.Debug("Accepted http status: ", resp.StatusCode)
-//                             defer resp.Body.Close()
-//                             return true
-//                     }
-//             }
-//     }
-//     return false
-// }
-
 func fetch_token() (*kafka.OAuthBearerToken, error) {
        log.Debug("Get token inline")
        conf := &clientcredentials.Config{
@@ -979,8 +843,6 @@ func fetch_token() (*kafka.OAuthBearerToken, error) {
        log.Debug("=====================================================")
        log.Debug("Expiration: ", token.Expiry)
        t := token.Expiry
-       // t := token.Expiry.Add(-time.Minute)
-       // log.Debug("Modified expiration: ", t)
        oauthBearerToken := kafka.OAuthBearerToken{
                TokenValue: token.AccessToken,
                Expiration: t,
@@ -1170,8 +1032,6 @@ func create_job(w http.ResponseWriter, req *http.Request) {
                return
        }
 
-       //TODO: Verify that job contains enough parameters...
-
        if !job_found {
                job_record = InfoJobRecord{}
                job_record.job_info = t
@@ -1190,7 +1050,6 @@ func create_job(w http.ResponseWriter, req *http.Request) {
 
                jc.command = "ADD-FILTER"
 
-               //TODO: Refactor
                if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
                        fm := FilterMaps{}
                        fm.sourceNameMap = make(map[string]bool)
@@ -1292,7 +1151,7 @@ func supervise_producer(w http.ResponseWriter, req *http.Request) {
        w.WriteHeader(http.StatusOK)
 }
 
-// producer statictics, all jobs
+// producer statistics, all jobs
 func statistics(w http.ResponseWriter, req *http.Request) {
        if req.Method != http.MethodGet {
                w.WriteHeader(http.StatusMethodNotAllowed)
@@ -1407,7 +1266,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont
                case reader_ctrl := <-control_ch:
                        if reader_ctrl.command == "EXIT" {
                                log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
-                               //TODO: Stop consumer if present?
                                data_ch <- nil //Signal to job handler
                                running = false
                                return
@@ -1457,7 +1315,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont
                                                }
                                        }
                                default:
-                                       //TODO: Handle these?
                                        log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
                                }
 
@@ -1475,7 +1332,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont
 
        go func() {
                for {
-                       //maxDur := 1 * time.Second
                        for {
                                select {
                                case reader_ctrl := <-control_ch:
@@ -1498,9 +1354,8 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont
                                                var kmsg KafkaPayload
                                                kmsg.msg = e
 
-                                               c.Commit() //TODO: Ok?
+                                               c.Commit()
 
-                                               //TODO: Check for exception
                                                data_ch <- &kmsg
                                                stats.in_msg_cnt++
                                                log.Debug("Reader msg: ", &kmsg)
@@ -1524,24 +1379,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont
                                        default:
                                                fmt.Printf("Ignored %v\n", e)
                                        }
-
-                                       // orig code
-                                       // msg, err := c.ReadMessage(maxDur)
-                                       // if err == nil {
-                                       //      var kmsg KafkaPayload
-                                       //      kmsg.msg = msg
-
-                                       //      c.Commit() //TODO: Ok?
-
-                                       //      //TODO: Check for exception
-                                       //      data_ch <- &kmsg
-                                       //      stats.in_msg_cnt++
-                                       //      log.Debug("Reader msg: ", &kmsg)
-                                       //      log.Debug("Reader - data_ch ", data_ch)
-                                       // } else {
-                                       //      log.Debug("Topic Reader for type: ", type_id, "  Nothing to consume on topic: ", topic, ", reason: ", err)
-                                       // }
-
                                }
                        }
                }
@@ -1569,7 +1406,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa
                                time.Sleep(1 * time.Second)
                        } else {
                                log.Debug("Kafka producer started")
-                               //defer kafka_producer.Close()
                        }
                }
        }
@@ -1579,7 +1415,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa
                for {
                        select {
                        case evt := <-kafka_producer.Events():
-                               //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend?
                                switch evt.(type) {
                                case *kafka.Message:
                                        m := evt.(*kafka.Message)
@@ -1630,7 +1465,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa
                        case kmsg := <-data_ch:
                                if kmsg == nil {
                                        event_chan <- 0
-                                       // TODO: Close producer?
                                        log.Info("Topic writer stopped by channel signal - start_topic_writer")
                                        defer kafka_producer.Close()
                                        return
@@ -1654,7 +1488,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa
                                        }
                                }
                                if !msg_ok {
-                                       //TODO: Retry sending msg?
                                        log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
                                }
                        case <-time.After(1000 * time.Millisecond):
@@ -1676,7 +1509,6 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum
                        "client.id":          cid,
                        "auto.offset.reset":  "latest",
                        "enable.auto.commit": false,
-                       //"auto.commit.interval.ms": 5000,
                }
        } else {
                log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
@@ -1692,15 +1524,11 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum
        }
        c, err := kafka.NewConsumer(&cm)
 
-       //TODO: How to handle autocommit or commit message by message
-       //TODO: Make arg to kafka configurable
-
        if err != nil {
                log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
                return nil
        }
 
-       //c.Commit()
        log.Info("Created kafka consumer for type: ", type_id, " OK")
        return c
 }
@@ -1815,7 +1643,6 @@ func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in
                                        }
                                }
                        }
-                       //TODO: Sort processed file conversions in order (FIFO)
                        jobLimiterChan <- struct{}{}
                        go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
 
@@ -1825,7 +1652,6 @@ func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in
                        }
                }
        }
-       //}()
 }
 
 func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
@@ -1854,7 +1680,6 @@ func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, da
 
        var reader io.Reader
 
-       //TODO -> config
        INPUTBUCKET := "ropfiles"
 
        filename := ""
@@ -1868,7 +1693,6 @@ func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, da
                }
                defer fi.Close()
                reader = fi
-               //} else if evt_data.ObjectStoreBucket != "" {
        } else {
                filename = evt_data.Name
                if mc != nil {
@@ -2038,7 +1862,6 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, er
        start = time.Now()
        var pmfile PMJsonFile
 
-       //TODO: Fill in more values
        pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
        pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
        pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
@@ -2104,7 +1927,6 @@ func start_job_json_file_data(type_id string, control_ch chan JobControl, data_i
 
        filters := make(map[string]Filter)
        filterParams_list := make(map[string]FilterMaps)
-       // ch_list := make(map[string]chan *KafkaPayload)
        topic_list := make(map[string]string)
        var mc *minio.Client
        const mc_id = "mc_" + "start_job_json_file_data"
@@ -2115,9 +1937,7 @@ func start_job_json_file_data(type_id string, control_ch chan JobControl, data_i
                        log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
                        switch job_ctl.command {
                        case "EXIT":
-                               //ignore cmd - handled by channel signal
                        case "ADD-FILTER":
-                               //TODO: Refactor...
                                filters[job_ctl.filter.JobId] = job_ctl.filter
                                log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
 
@@ -2135,7 +1955,6 @@ func start_job_json_file_data(type_id string, control_ch chan JobControl, data_i
                                tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
                                topic_list = tmp_topic_list
                        case "REMOVE-FILTER":
-                               //TODO: Refactor...
                                log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
 
                                tmp_filterParams_list := make(map[string]FilterMaps)
@@ -2200,8 +2019,6 @@ func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, f
 
        var reader io.Reader
 
-       //TODO -> config
-       //INPUTBUCKET := "json-file-ready"
        INPUTBUCKET := "pm-files-json"
        filename := ""
        if objectstore == false {
@@ -2260,16 +2077,6 @@ func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, f
                log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
        }
 
-       // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
-       // var pmfile PMJsonFile
-       // start := time.Now()
-       // err = jsoniter.Unmarshal(*data, &pmfile)
-       // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
-       // if err != nil {
-       //      log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
-       //      return
-       // }
        for k, v := range filterList {
 
                var pmfile PMJsonFile
@@ -2290,27 +2097,14 @@ func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, f
                log.Debug("measObjClassMap:", v.measObjClassMap)
                log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
                log.Debug("measTypesMap:", v.measTypesMap)
-               //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
+
                b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
                if b == nil {
                        log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
                        return
                }
                kmsg.msg.Value = *b
-               //BMX}
-
-               // if outputCompression == "json.gz" {
-               //      start := time.Now()
-               //      var buf bytes.Buffer
-               //      err := gzipWrite(&buf, &kmsg.msg.Value)
-               //      if err != nil {
-               //              log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err)
-               //              return
-
-               //      }
-               //      kmsg.msg.Value = buf.Bytes()
-               //      log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String())
-               // }
+
                kmsg.topic = topic_list[k]
                kmsg.jobid = k
 
@@ -2441,122 +2235,6 @@ func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[
        return data
 }
 
-// func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
-
-//     filter_req := true
-//     start := time.Now()
-//     if len(sourceNameMap) != 0 {
-//             if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
-//                     filter_req = false
-//                     return nil
-//             }
-//     }
-//     if filter_req {
-//             modified := false
-//             var temp_mil []MeasInfoList
-//             for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
-
-//                     check_cntr := false
-//                     var cnt_flags []bool
-//                     if len(measTypesMap) > 0 {
-//                             c_cntr := 0
-//                             var temp_mtl []string
-//                             for _, v := range zz.MeasTypes.SMeasTypesList {
-//                                     if measTypesMap[v] {
-//                                             cnt_flags = append(cnt_flags, true)
-//                                             c_cntr++
-//                                             temp_mtl = append(temp_mtl, v)
-//                                     } else {
-//                                             cnt_flags = append(cnt_flags, false)
-//                                     }
-//                             }
-//                             if c_cntr > 0 {
-//                                     check_cntr = true
-//                                     zz.MeasTypes.SMeasTypesList = temp_mtl
-//                             } else {
-//                                     modified = true
-//                                     continue
-//                             }
-//                     }
-//                     keep := false
-//                     var temp_mvl []MeasValues
-//                     for _, yy := range zz.MeasValuesList {
-//                             keep_class := false
-//                             keep_inst := false
-//                             keep_cntr := false
-
-//                             dna := strings.Split(yy.MeasObjInstID, ",")
-//                             instName := dna[len(dna)-1]
-//                             cls := strings.Split(dna[len(dna)-1], "=")[0]
-
-//                             if len(measObjClassMap) > 0 {
-//                                     if measObjClassMap[cls] {
-//                                             keep_class = true
-//                                     }
-//                             } else {
-//                                     keep_class = true
-//                             }
-
-//                             if len(measObjInstIdsMap) > 0 {
-//                                     if measObjInstIdsMap[instName] {
-//                                             keep_inst = true
-//                                     }
-//                             } else {
-//                                     keep_inst = true
-//                             }
-
-//                             if check_cntr {
-//                                     var temp_mrl []MeasResults
-//                                     cnt_p := 1
-//                                     for _, v := range yy.MeasResultsList {
-//                                             if cnt_flags[v.P-1] {
-//                                                     v.P = cnt_p
-//                                                     cnt_p++
-//                                                     temp_mrl = append(temp_mrl, v)
-//                                             }
-//                                     }
-//                                     yy.MeasResultsList = temp_mrl
-//                                     keep_cntr = true
-//                             } else {
-//                                     keep_cntr = true
-//                             }
-//                             if keep_class && keep_cntr && keep_inst {
-//                                     keep = true
-//                                     temp_mvl = append(temp_mvl, yy)
-//                             }
-//                     }
-//                     if keep {
-//                             zz.MeasValuesList = temp_mvl
-//                             temp_mil = append(temp_mil, zz)
-//                             modified = true
-//                     }
-
-//             }
-//             //Only if modified
-//             if modified {
-//                     if len(temp_mil) == 0 {
-//                             log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
-//                             return nil
-//                     }
-//                     data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
-//             }
-//     }
-//     log.Debug("Filter: ", time.Since(start).String())
-
-//     start = time.Now()
-//     j, err := jsoniter.Marshal(&data)
-
-//     log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
-
-//     if err != nil {
-//             log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
-//             return nil
-//     }
-
-//     log.Debug("Filtered json obj: ", resource, " len: ", len(j))
-//     return &j
-// }
-
 func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
 
        log.Info("Type job", type_id, " started")
@@ -2575,7 +2253,7 @@ func start_job_json_file_data_influx(type_id string, control_ch chan JobControl,
                        case "EXIT":
                                //ignore cmd - handled by channel signal
                        case "ADD-FILTER":
-                               //TODO: Refactor...
+
                                filters[job_ctl.filter.JobId] = job_ctl.filter
                                log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
                                log.Debug(job_ctl.filter)
@@ -2594,7 +2272,7 @@ func start_job_json_file_data_influx(type_id string, control_ch chan JobControl,
                                influx_job_params = tmp_influx_job_params
 
                        case "REMOVE-FILTER":
-                               //TODO: Refactor...
+
                                log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
 
                                tmp_filterParams_list := make(map[string]FilterMaps)
@@ -2629,7 +2307,7 @@ func start_job_json_file_data_influx(type_id string, control_ch chan JobControl,
                                        }
                                }
                        }
-                       //TODO: Sort processed file conversions in order (FIFO)
+
                        jobLimiterChan <- struct{}{}
                        go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
 
@@ -2661,8 +2339,6 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList
 
        var reader io.Reader
 
-       //TODO -> config
-       //INPUTBUCKET := "json-file-ready"
        INPUTBUCKET := "pm-files-json"
        filename := ""
        if objectstore == false {
@@ -2720,16 +2396,6 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList
 
                log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
        }
-       // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
-       // var pmfile PMJsonFile
-       // start := time.Now()
-       // err = jsoniter.Unmarshal(*data, &pmfile)
-       // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
-       // if err != nil {
-       //      log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
-       //      return
-       // }
        for k, v := range filterList {
 
                var pmfile PMJsonFile
@@ -2755,16 +2421,10 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList
                client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
                writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
 
-               // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec)
-               // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond)
-               // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond)
-               // timeT := time.Unix(tUnix, tUnixNanoRemainder)
-               // fmt.Println(timeT)
-               // fmt.Println("======================")
                for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
                        ctr_names := make(map[string]string)
                        for cni, cn := range zz.MeasTypes.SMeasTypesList {
-                               ctr_names[string(cni+1)] = cn
+                               ctr_names[strconv.Itoa(cni+1)] = cn
                        }
                        for _, xx := range zz.MeasValuesList {
                                log.Debug("Measurement: ", xx.MeasObjInstID)
@@ -2774,7 +2434,7 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList
                                p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
                                p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
                                for _, yy := range xx.MeasResultsList {
-                                       pi := string(yy.P)
+                                       pi := strconv.Itoa(yy.P)
                                        pv := yy.SValue
                                        pn := ctr_names[pi]
                                        log.Debug("Counter: ", pn, " Value: ", pv)