X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pm-file-converter%2Fmain.go;h=d37b0d287ed0f066563d2be1366a24d9a7a44d99;hb=08e640604edf8056bfe35155c04e88653e1872c4;hp=9c3d8a412054e32655d1c0f580177bb71a79e33c;hpb=5a01ec4da9c004c8b74d2997ee7d3ffa77dfaf95;p=nonrtric%2Fplt%2Franpm.git diff --git a/pm-file-converter/main.go b/pm-file-converter/main.go index 9c3d8a4..d37b0d2 100644 --- a/pm-file-converter/main.go +++ b/pm-file-converter/main.go @@ -444,13 +444,12 @@ func main() { cer, err := tls.LoadX509KeyPair(server_crt, server_key) if err != nil { - log.Error("Cannot load key and cert - %v\n", err) + log.Error("Cannot load key and cert - ", err) return } config := &tls.Config{Certificates: []tls.Certificate{cer}} https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil} - //TODO: Make http on/off configurable // Run http go func() { log.Info("Starting http service...") @@ -458,11 +457,10 @@ func main() { if err == http.ErrServerClosed { // graceful shutdown log.Info("http server shutdown...") } else if err != nil { - log.Error("http server error: %v\n", err) + log.Error("http server error: ", err) } }() - //TODO: Make https on/off configurable // Run https go func() { log.Info("Starting https service...") @@ -470,7 +468,7 @@ func main() { if err == http.ErrServerClosed { // graceful shutdown log.Info("https server shutdown...") } else if err != nil { - log.Error("https server error: %v\n", err) + log.Error("https server error: ", err) } }() check_tcp(strconv.Itoa(http_port)) @@ -596,7 +594,6 @@ func register_producer() bool { log.Error("Registering producer: ", producer_instance_name, " - failed") return false } else { - //TODO: http/https should be configurable ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") if !ok { log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) @@ -611,9 +608,7 @@ func register_producer() bool { log.Debug("Registering types: ", new_type_names) m := make(map[string]interface{}) m["supported_info_types"] = new_type_names - //TODO: http/https should be configurable m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name - //TODO: http/https should be configurable m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name json, err := json.Marshal(m) @@ -673,7 +668,6 @@ func remove_type_job(dp DataType) { if dp.ext_job_created == true { dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) - //TODO: http/https should be configurable ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") if !ok { log.Error("Cannot delete job: ", dp.ext_job_id) @@ -709,8 +703,6 @@ func start_type_job(dp DataType) { go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false) case "json-file-data-from-filestore-to-influx": go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true) - // case "json-data-to-influx": - // go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel) default: } @@ -741,13 +733,11 @@ func create_ext_job(dp DataType) { } dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) - //TODO: http/https should be configurable ok := false for !ok { ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") if !ok { log.Error("Cannot register job: ", dp.InputJobType) - //TODO: Restart after long time? } } log.Debug("Registered job ok: ", dp.InputJobType) @@ -833,132 +823,6 @@ func send_http_request(json []byte, method string, url string, retry bool, useAu return false } -// // Send a http request with json (json may be nil) -// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { - -// // set the HTTP method, url, and request body -// var req *http.Request -// var err error -// if json == nil { -// req, err = http.NewRequest(method, url, http.NoBody) -// } else { -// req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) -// req.Header.Set("Content-Type", "application/json; charset=utf-8") -// } -// if err != nil { -// log.Error("Cannot create http request, method: ", method, " url: ", url) -// return false -// } - -// if useAuth { -// token, err := fetch_token() -// if err != nil { -// log.Error("Cannot fetch token for http request: ", err) -// return false -// } -// req.Header.Set("Authorization", "Bearer "+token.TokenValue) -// } - -// log.Debug("HTTP request: ", req) - -// retries := 1 -// if retry { -// retries = 5 -// } -// sleep_time := 1 -// for i := retries; i > 0; i-- { -// log.Debug("Sending http request") -// resp, err2 := httpclient.Do(req) -// if err2 != nil { -// log.Error("Http request error: ", err2) -// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1) - -// time.Sleep(time.Duration(sleep_time) * time.Second) -// sleep_time = 2 * sleep_time -// } else { -// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { -// log.Debug("Accepted http status: ", resp.StatusCode) -// resp.Body.Close() -// return true -// } -// log.Debug("HTTP resp: ", resp) -// resp.Body.Close() -// } -// } -// return false -// } - -// // Send a http request with json (json may be nil) -// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { -// // initialize http client -// client := &http.Client{} - -// // set the HTTP method, url, and request body -// var req *http.Request -// var err error -// if json == nil { -// req, err = http.NewRequest(method, url, http.NoBody) -// } else { -// req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) -// req.Header.Set("Content-Type", "application/json; charset=utf-8") -// } -// if err != nil { -// log.Error("Cannot create http request method: ", method, " url: ", url) -// return false -// } - -// useAuth = false -// if useAuth { -// token, err := fetch_token() -// if err != nil { -// log.Error("Cannot fetch token for http request: ", err) -// return false -// } -// req.Header.Add("Authorization", "Bearer "+token.TokenValue) -// } -// log.Debug("HTTP request: ", req) - -// b, berr := io.ReadAll(req.Body) -// if berr == nil { -// log.Debug("HTTP request body length: ", len(b)) -// } else { -// log.Debug("HTTP request - cannot check body length: ", berr) -// } -// if json == nil { -// log.Debug("HTTP request null json") -// } else { -// log.Debug("HTTP request json: ", string(json)) -// } -// requestDump, cerr := httputil.DumpRequestOut(req, true) -// if cerr != nil { -// fmt.Println(cerr) -// } -// fmt.Println(string(requestDump)) - -// retries := 1 -// if retry { -// retries = 5 -// } -// sleep_time := 1 -// for i := retries; i > 0; i-- { -// resp, err2 := client.Do(req) -// if err2 != nil { -// log.Error("Http request error: ", err2) -// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i) - -// time.Sleep(time.Duration(sleep_time) * time.Second) -// sleep_time = 2 * sleep_time -// } else { -// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { -// log.Debug("Accepted http status: ", resp.StatusCode) -// defer resp.Body.Close() -// return true -// } -// } -// } -// return false -// } - func fetch_token() (*kafka.OAuthBearerToken, error) { log.Debug("Get token inline") conf := &clientcredentials.Config{ @@ -979,8 +843,6 @@ func fetch_token() (*kafka.OAuthBearerToken, error) { log.Debug("=====================================================") log.Debug("Expiration: ", token.Expiry) t := token.Expiry - // t := token.Expiry.Add(-time.Minute) - // log.Debug("Modified expiration: ", t) oauthBearerToken := kafka.OAuthBearerToken{ TokenValue: token.AccessToken, Expiration: t, @@ -1170,8 +1032,6 @@ func create_job(w http.ResponseWriter, req *http.Request) { return } - //TODO: Verify that job contains enough parameters... - if !job_found { job_record = InfoJobRecord{} job_record.job_info = t @@ -1190,7 +1050,6 @@ func create_job(w http.ResponseWriter, req *http.Request) { jc.command = "ADD-FILTER" - //TODO: Refactor if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { fm := FilterMaps{} fm.sourceNameMap = make(map[string]bool) @@ -1292,7 +1151,7 @@ func supervise_producer(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } -// producer statictics, all jobs +// producer statistics, all jobs func statistics(w http.ResponseWriter, req *http.Request) { if req.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) @@ -1407,7 +1266,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont case reader_ctrl := <-control_ch: if reader_ctrl.command == "EXIT" { log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") - //TODO: Stop consumer if present? data_ch <- nil //Signal to job handler running = false return @@ -1457,7 +1315,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont } } default: - //TODO: Handle these? log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String()) } @@ -1475,7 +1332,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont go func() { for { - //maxDur := 1 * time.Second for { select { case reader_ctrl := <-control_ch: @@ -1498,9 +1354,8 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont var kmsg KafkaPayload kmsg.msg = e - c.Commit() //TODO: Ok? + c.Commit() - //TODO: Check for exception data_ch <- &kmsg stats.in_msg_cnt++ log.Debug("Reader msg: ", &kmsg) @@ -1524,24 +1379,6 @@ func start_topic_reader(topic string, type_id string, control_ch chan ReaderCont default: fmt.Printf("Ignored %v\n", e) } - - // orig code - // msg, err := c.ReadMessage(maxDur) - // if err == nil { - // var kmsg KafkaPayload - // kmsg.msg = msg - - // c.Commit() //TODO: Ok? - - // //TODO: Check for exception - // data_ch <- &kmsg - // stats.in_msg_cnt++ - // log.Debug("Reader msg: ", &kmsg) - // log.Debug("Reader - data_ch ", data_ch) - // } else { - // log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic, ", reason: ", err) - // } - } } } @@ -1569,7 +1406,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa time.Sleep(1 * time.Second) } else { log.Debug("Kafka producer started") - //defer kafka_producer.Close() } } } @@ -1579,7 +1415,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa for { select { case evt := <-kafka_producer.Events(): - //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend? switch evt.(type) { case *kafka.Message: m := evt.(*kafka.Message) @@ -1630,7 +1465,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa case kmsg := <-data_ch: if kmsg == nil { event_chan <- 0 - // TODO: Close producer? log.Info("Topic writer stopped by channel signal - start_topic_writer") defer kafka_producer.Close() return @@ -1654,7 +1488,6 @@ func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayloa } } if !msg_ok { - //TODO: Retry sending msg? log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err) } case <-time.After(1000 * time.Millisecond): @@ -1676,7 +1509,6 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum "client.id": cid, "auto.offset.reset": "latest", "enable.auto.commit": false, - //"auto.commit.interval.ms": 5000, } } else { log.Info("Creating kafka SASL plain text consumer for type: ", type_id) @@ -1692,15 +1524,11 @@ func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consum } c, err := kafka.NewConsumer(&cm) - //TODO: How to handle autocommit or commit message by message - //TODO: Make arg to kafka configurable - if err != nil { log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err) return nil } - //c.Commit() log.Info("Created kafka consumer for type: ", type_id, " OK") return c } @@ -1815,7 +1643,6 @@ func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in } } } - //TODO: Sort processed file conversions in order (FIFO) jobLimiterChan <- struct{}{} go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id) @@ -1825,7 +1652,6 @@ func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in } } } - //}() } func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) { @@ -1854,7 +1680,6 @@ func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, da var reader io.Reader - //TODO -> config INPUTBUCKET := "ropfiles" filename := "" @@ -1868,7 +1693,6 @@ func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, da } defer fi.Close() reader = fi - //} else if evt_data.ObjectStoreBucket != "" { } else { filename = evt_data.Name if mc != nil { @@ -2038,7 +1862,6 @@ func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, er start = time.Now() var pmfile PMJsonFile - //TODO: Fill in more values pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0" pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = "" @@ -2104,7 +1927,6 @@ func start_job_json_file_data(type_id string, control_ch chan JobControl, data_i filters := make(map[string]Filter) filterParams_list := make(map[string]FilterMaps) - // ch_list := make(map[string]chan *KafkaPayload) topic_list := make(map[string]string) var mc *minio.Client const mc_id = "mc_" + "start_job_json_file_data" @@ -2115,9 +1937,7 @@ func start_job_json_file_data(type_id string, control_ch chan JobControl, data_i log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) switch job_ctl.command { case "EXIT": - //ignore cmd - handled by channel signal case "ADD-FILTER": - //TODO: Refactor... filters[job_ctl.filter.JobId] = job_ctl.filter log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) @@ -2135,7 +1955,6 @@ func start_job_json_file_data(type_id string, control_ch chan JobControl, data_i tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic topic_list = tmp_topic_list case "REMOVE-FILTER": - //TODO: Refactor... log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) tmp_filterParams_list := make(map[string]FilterMaps) @@ -2200,8 +2019,6 @@ func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, f var reader io.Reader - //TODO -> config - //INPUTBUCKET := "json-file-ready" INPUTBUCKET := "pm-files-json" filename := "" if objectstore == false { @@ -2260,16 +2077,6 @@ func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, f log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) } - // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside - // var pmfile PMJsonFile - // start := time.Now() - // err = jsoniter.Unmarshal(*data, &pmfile) - // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) - - // if err != nil { - // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) - // return - // } for k, v := range filterList { var pmfile PMJsonFile @@ -2290,27 +2097,14 @@ func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, f log.Debug("measObjClassMap:", v.measObjClassMap) log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap) log.Debug("measTypesMap:", v.measTypesMap) - //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 { + b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) if b == nil { log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") return } kmsg.msg.Value = *b - //BMX} - - // if outputCompression == "json.gz" { - // start := time.Now() - // var buf bytes.Buffer - // err := gzipWrite(&buf, &kmsg.msg.Value) - // if err != nil { - // log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err) - // return - - // } - // kmsg.msg.Value = buf.Bytes() - // log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String()) - // } + kmsg.topic = topic_list[k] kmsg.jobid = k @@ -2441,122 +2235,6 @@ func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[ return data } -// func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte { - -// filter_req := true -// start := time.Now() -// if len(sourceNameMap) != 0 { -// if !sourceNameMap[data.Event.CommonEventHeader.SourceName] { -// filter_req = false -// return nil -// } -// } -// if filter_req { -// modified := false -// var temp_mil []MeasInfoList -// for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { - -// check_cntr := false -// var cnt_flags []bool -// if len(measTypesMap) > 0 { -// c_cntr := 0 -// var temp_mtl []string -// for _, v := range zz.MeasTypes.SMeasTypesList { -// if measTypesMap[v] { -// cnt_flags = append(cnt_flags, true) -// c_cntr++ -// temp_mtl = append(temp_mtl, v) -// } else { -// cnt_flags = append(cnt_flags, false) -// } -// } -// if c_cntr > 0 { -// check_cntr = true -// zz.MeasTypes.SMeasTypesList = temp_mtl -// } else { -// modified = true -// continue -// } -// } -// keep := false -// var temp_mvl []MeasValues -// for _, yy := range zz.MeasValuesList { -// keep_class := false -// keep_inst := false -// keep_cntr := false - -// dna := strings.Split(yy.MeasObjInstID, ",") -// instName := dna[len(dna)-1] -// cls := strings.Split(dna[len(dna)-1], "=")[0] - -// if len(measObjClassMap) > 0 { -// if measObjClassMap[cls] { -// keep_class = true -// } -// } else { -// keep_class = true -// } - -// if len(measObjInstIdsMap) > 0 { -// if measObjInstIdsMap[instName] { -// keep_inst = true -// } -// } else { -// keep_inst = true -// } - -// if check_cntr { -// var temp_mrl []MeasResults -// cnt_p := 1 -// for _, v := range yy.MeasResultsList { -// if cnt_flags[v.P-1] { -// v.P = cnt_p -// cnt_p++ -// temp_mrl = append(temp_mrl, v) -// } -// } -// yy.MeasResultsList = temp_mrl -// keep_cntr = true -// } else { -// keep_cntr = true -// } -// if keep_class && keep_cntr && keep_inst { -// keep = true -// temp_mvl = append(temp_mvl, yy) -// } -// } -// if keep { -// zz.MeasValuesList = temp_mvl -// temp_mil = append(temp_mil, zz) -// modified = true -// } - -// } -// //Only if modified -// if modified { -// if len(temp_mil) == 0 { -// log.Debug("Msg filtered, nothing found, discarding, obj: ", resource) -// return nil -// } -// data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil -// } -// } -// log.Debug("Filter: ", time.Since(start).String()) - -// start = time.Now() -// j, err := jsoniter.Marshal(&data) - -// log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String()) - -// if err != nil { -// log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err) -// return nil -// } - -// log.Debug("Filtered json obj: ", resource, " len: ", len(j)) -// return &j -// } - func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) { log.Info("Type job", type_id, " started") @@ -2575,7 +2253,7 @@ func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, case "EXIT": //ignore cmd - handled by channel signal case "ADD-FILTER": - //TODO: Refactor... + filters[job_ctl.filter.JobId] = job_ctl.filter log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) log.Debug(job_ctl.filter) @@ -2594,7 +2272,7 @@ func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, influx_job_params = tmp_influx_job_params case "REMOVE-FILTER": - //TODO: Refactor... + log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) tmp_filterParams_list := make(map[string]FilterMaps) @@ -2629,7 +2307,7 @@ func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, } } } - //TODO: Sort processed file conversions in order (FIFO) + jobLimiterChan <- struct{}{} go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore) @@ -2661,8 +2339,6 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList var reader io.Reader - //TODO -> config - //INPUTBUCKET := "json-file-ready" INPUTBUCKET := "pm-files-json" filename := "" if objectstore == false { @@ -2720,16 +2396,6 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) } - // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside - // var pmfile PMJsonFile - // start := time.Now() - // err = jsoniter.Unmarshal(*data, &pmfile) - // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) - - // if err != nil { - // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) - // return - // } for k, v := range filterList { var pmfile PMJsonFile @@ -2755,16 +2421,10 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken) writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket) - // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec) - // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond) - // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond) - // timeT := time.Unix(tUnix, tUnixNanoRemainder) - // fmt.Println(timeT) - // fmt.Println("======================") for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { ctr_names := make(map[string]string) for cni, cn := range zz.MeasTypes.SMeasTypesList { - ctr_names[string(cni+1)] = cn + ctr_names[strconv.Itoa(cni+1)] = cn } for _, xx := range zz.MeasValuesList { log.Debug("Measurement: ", xx.MeasObjInstID) @@ -2774,7 +2434,7 @@ func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod) p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset) for _, yy := range xx.MeasResultsList { - pi := string(yy.P) + pi := strconv.Itoa(yy.P) pv := yy.SValue pn := ctr_names[pi] log.Debug("Counter: ", pn, " Value: ", pv)