cer, err := tls.LoadX509KeyPair(server_crt, server_key)
if err != nil {
- log.Error("Cannot load key and cert - %v\n", err)
+ log.Error("Cannot load key and cert - ", err)
return
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
- //TODO: Make http on/off configurable
// Run http
go func() {
log.Info("Starting http service...")
if err == http.ErrServerClosed { // graceful shutdown
log.Info("http server shutdown...")
} else if err != nil {
- log.Error("http server error: %v\n", err)
+ log.Error("http server error: ", err)
}
}()
- //TODO: Make https on/off configurable
// Run https
go func() {
log.Info("Starting https service...")
if err == http.ErrServerClosed { // graceful shutdown
log.Info("https server shutdown...")
} else if err != nil {
- log.Error("https server error: %v\n", err)
+ log.Error("https server error: ", err)
}
}()
check_tcp(strconv.Itoa(http_port))
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
} else {
- //TODO: http/https should be configurable
ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
log.Debug("Registering types: ", new_type_names)
m := make(map[string]interface{})
m["supported_info_types"] = new_type_names
- //TODO: http/https should be configurable
m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
- //TODO: http/https should be configurable
m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
json, err := json.Marshal(m)
if dp.ext_job_created == true {
dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
- //TODO: http/https should be configurable
ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
if !ok {
log.Error("Cannot delete job: ", dp.ext_job_id)
go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
case "json-file-data-from-filestore-to-influx":
go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
- // case "json-data-to-influx":
- // go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel)
default:
}
}
dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
- //TODO: http/https should be configurable
ok := false
for !ok {
ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register job: ", dp.InputJobType)
- //TODO: Restart after long time?
}
}
log.Debug("Registered job ok: ", dp.InputJobType)
return false
}
-// // Send a http request with json (json may be nil)
-// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
-
-// // set the HTTP method, url, and request body
-// var req *http.Request
-// var err error
-// if json == nil {
-// req, err = http.NewRequest(method, url, http.NoBody)
-// } else {
-// req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
-// req.Header.Set("Content-Type", "application/json; charset=utf-8")
-// }
-// if err != nil {
-// log.Error("Cannot create http request, method: ", method, " url: ", url)
-// return false
-// }
-
-// if useAuth {
-// token, err := fetch_token()
-// if err != nil {
-// log.Error("Cannot fetch token for http request: ", err)
-// return false
-// }
-// req.Header.Set("Authorization", "Bearer "+token.TokenValue)
-// }
-
-// log.Debug("HTTP request: ", req)
-
-// retries := 1
-// if retry {
-// retries = 5
-// }
-// sleep_time := 1
-// for i := retries; i > 0; i-- {
-// log.Debug("Sending http request")
-// resp, err2 := httpclient.Do(req)
-// if err2 != nil {
-// log.Error("Http request error: ", err2)
-// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1)
-
-// time.Sleep(time.Duration(sleep_time) * time.Second)
-// sleep_time = 2 * sleep_time
-// } else {
-// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
-// log.Debug("Accepted http status: ", resp.StatusCode)
-// resp.Body.Close()
-// return true
-// }
-// log.Debug("HTTP resp: ", resp)
-// resp.Body.Close()
-// }
-// }
-// return false
-// }
-
-// // Send a http request with json (json may be nil)
-// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
-// // initialize http client
-// client := &http.Client{}
-
-// // set the HTTP method, url, and request body
-// var req *http.Request
-// var err error
-// if json == nil {
-// req, err = http.NewRequest(method, url, http.NoBody)
-// } else {
-// req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
-// req.Header.Set("Content-Type", "application/json; charset=utf-8")
-// }
-// if err != nil {
-// log.Error("Cannot create http request method: ", method, " url: ", url)
-// return false
-// }
-
-// useAuth = false
-// if useAuth {
-// token, err := fetch_token()
-// if err != nil {
-// log.Error("Cannot fetch token for http request: ", err)
-// return false
-// }
-// req.Header.Add("Authorization", "Bearer "+token.TokenValue)
-// }
-// log.Debug("HTTP request: ", req)
-
-// b, berr := io.ReadAll(req.Body)
-// if berr == nil {
-// log.Debug("HTTP request body length: ", len(b))
-// } else {
-// log.Debug("HTTP request - cannot check body length: ", berr)
-// }
-// if json == nil {
-// log.Debug("HTTP request null json")
-// } else {
-// log.Debug("HTTP request json: ", string(json))
-// }
-// requestDump, cerr := httputil.DumpRequestOut(req, true)
-// if cerr != nil {
-// fmt.Println(cerr)
-// }
-// fmt.Println(string(requestDump))
-
-// retries := 1
-// if retry {
-// retries = 5
-// }
-// sleep_time := 1
-// for i := retries; i > 0; i-- {
-// resp, err2 := client.Do(req)
-// if err2 != nil {
-// log.Error("Http request error: ", err2)
-// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i)
-
-// time.Sleep(time.Duration(sleep_time) * time.Second)
-// sleep_time = 2 * sleep_time
-// } else {
-// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
-// log.Debug("Accepted http status: ", resp.StatusCode)
-// defer resp.Body.Close()
-// return true
-// }
-// }
-// }
-// return false
-// }
-
func fetch_token() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
log.Debug("=====================================================")
log.Debug("Expiration: ", token.Expiry)
t := token.Expiry
- // t := token.Expiry.Add(-time.Minute)
- // log.Debug("Modified expiration: ", t)
oauthBearerToken := kafka.OAuthBearerToken{
TokenValue: token.AccessToken,
Expiration: t,
return
}
- //TODO: Verify that job contains enough parameters...
-
if !job_found {
job_record = InfoJobRecord{}
job_record.job_info = t
jc.command = "ADD-FILTER"
- //TODO: Refactor
if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
fm := FilterMaps{}
fm.sourceNameMap = make(map[string]bool)
w.WriteHeader(http.StatusOK)
}
-// producer statictics, all jobs
+// producer statistics, all jobs
func statistics(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
case reader_ctrl := <-control_ch:
if reader_ctrl.command == "EXIT" {
log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- //TODO: Stop consumer if present?
data_ch <- nil //Signal to job handler
running = false
return
}
}
default:
- //TODO: Handle these?
log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
}
go func() {
for {
- //maxDur := 1 * time.Second
for {
select {
case reader_ctrl := <-control_ch:
var kmsg KafkaPayload
kmsg.msg = e
- c.Commit() //TODO: Ok?
+ c.Commit()
- //TODO: Check for exception
data_ch <- &kmsg
stats.in_msg_cnt++
log.Debug("Reader msg: ", &kmsg)
default:
fmt.Printf("Ignored %v\n", e)
}
-
- // orig code
- // msg, err := c.ReadMessage(maxDur)
- // if err == nil {
- // var kmsg KafkaPayload
- // kmsg.msg = msg
-
- // c.Commit() //TODO: Ok?
-
- // //TODO: Check for exception
- // data_ch <- &kmsg
- // stats.in_msg_cnt++
- // log.Debug("Reader msg: ", &kmsg)
- // log.Debug("Reader - data_ch ", data_ch)
- // } else {
- // log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic, ", reason: ", err)
- // }
-
}
}
}
time.Sleep(1 * time.Second)
} else {
log.Debug("Kafka producer started")
- //defer kafka_producer.Close()
}
}
}
for {
select {
case evt := <-kafka_producer.Events():
- //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend?
switch evt.(type) {
case *kafka.Message:
m := evt.(*kafka.Message)
case kmsg := <-data_ch:
if kmsg == nil {
event_chan <- 0
- // TODO: Close producer?
log.Info("Topic writer stopped by channel signal - start_topic_writer")
defer kafka_producer.Close()
return
}
}
if !msg_ok {
- //TODO: Retry sending msg?
log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
}
case <-time.After(1000 * time.Millisecond):
"client.id": cid,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
- //"auto.commit.interval.ms": 5000,
}
} else {
log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
}
c, err := kafka.NewConsumer(&cm)
- //TODO: How to handle autocommit or commit message by message
- //TODO: Make arg to kafka configurable
-
if err != nil {
log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
return nil
}
- //c.Commit()
log.Info("Created kafka consumer for type: ", type_id, " OK")
return c
}
}
}
}
- //TODO: Sort processed file conversions in order (FIFO)
jobLimiterChan <- struct{}{}
go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
}
}
}
- //}()
}
func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
var reader io.Reader
- //TODO -> config
INPUTBUCKET := "ropfiles"
filename := ""
}
defer fi.Close()
reader = fi
- //} else if evt_data.ObjectStoreBucket != "" {
} else {
filename = evt_data.Name
if mc != nil {
start = time.Now()
var pmfile PMJsonFile
- //TODO: Fill in more values
pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
filters := make(map[string]Filter)
filterParams_list := make(map[string]FilterMaps)
- // ch_list := make(map[string]chan *KafkaPayload)
topic_list := make(map[string]string)
var mc *minio.Client
const mc_id = "mc_" + "start_job_json_file_data"
log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
switch job_ctl.command {
case "EXIT":
- //ignore cmd - handled by channel signal
case "ADD-FILTER":
- //TODO: Refactor...
filters[job_ctl.filter.JobId] = job_ctl.filter
log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
topic_list = tmp_topic_list
case "REMOVE-FILTER":
- //TODO: Refactor...
log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
tmp_filterParams_list := make(map[string]FilterMaps)
var reader io.Reader
- //TODO -> config
- //INPUTBUCKET := "json-file-ready"
INPUTBUCKET := "pm-files-json"
filename := ""
if objectstore == false {
log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
}
- // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
- // var pmfile PMJsonFile
- // start := time.Now()
- // err = jsoniter.Unmarshal(*data, &pmfile)
- // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
- // if err != nil {
- // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
- // return
- // }
for k, v := range filterList {
var pmfile PMJsonFile
log.Debug("measObjClassMap:", v.measObjClassMap)
log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
log.Debug("measTypesMap:", v.measTypesMap)
- //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
+
b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
if b == nil {
log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
return
}
kmsg.msg.Value = *b
- //BMX}
-
- // if outputCompression == "json.gz" {
- // start := time.Now()
- // var buf bytes.Buffer
- // err := gzipWrite(&buf, &kmsg.msg.Value)
- // if err != nil {
- // log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err)
- // return
-
- // }
- // kmsg.msg.Value = buf.Bytes()
- // log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String())
- // }
+
kmsg.topic = topic_list[k]
kmsg.jobid = k
return data
}
-// func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
-
-// filter_req := true
-// start := time.Now()
-// if len(sourceNameMap) != 0 {
-// if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
-// filter_req = false
-// return nil
-// }
-// }
-// if filter_req {
-// modified := false
-// var temp_mil []MeasInfoList
-// for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
-
-// check_cntr := false
-// var cnt_flags []bool
-// if len(measTypesMap) > 0 {
-// c_cntr := 0
-// var temp_mtl []string
-// for _, v := range zz.MeasTypes.SMeasTypesList {
-// if measTypesMap[v] {
-// cnt_flags = append(cnt_flags, true)
-// c_cntr++
-// temp_mtl = append(temp_mtl, v)
-// } else {
-// cnt_flags = append(cnt_flags, false)
-// }
-// }
-// if c_cntr > 0 {
-// check_cntr = true
-// zz.MeasTypes.SMeasTypesList = temp_mtl
-// } else {
-// modified = true
-// continue
-// }
-// }
-// keep := false
-// var temp_mvl []MeasValues
-// for _, yy := range zz.MeasValuesList {
-// keep_class := false
-// keep_inst := false
-// keep_cntr := false
-
-// dna := strings.Split(yy.MeasObjInstID, ",")
-// instName := dna[len(dna)-1]
-// cls := strings.Split(dna[len(dna)-1], "=")[0]
-
-// if len(measObjClassMap) > 0 {
-// if measObjClassMap[cls] {
-// keep_class = true
-// }
-// } else {
-// keep_class = true
-// }
-
-// if len(measObjInstIdsMap) > 0 {
-// if measObjInstIdsMap[instName] {
-// keep_inst = true
-// }
-// } else {
-// keep_inst = true
-// }
-
-// if check_cntr {
-// var temp_mrl []MeasResults
-// cnt_p := 1
-// for _, v := range yy.MeasResultsList {
-// if cnt_flags[v.P-1] {
-// v.P = cnt_p
-// cnt_p++
-// temp_mrl = append(temp_mrl, v)
-// }
-// }
-// yy.MeasResultsList = temp_mrl
-// keep_cntr = true
-// } else {
-// keep_cntr = true
-// }
-// if keep_class && keep_cntr && keep_inst {
-// keep = true
-// temp_mvl = append(temp_mvl, yy)
-// }
-// }
-// if keep {
-// zz.MeasValuesList = temp_mvl
-// temp_mil = append(temp_mil, zz)
-// modified = true
-// }
-
-// }
-// //Only if modified
-// if modified {
-// if len(temp_mil) == 0 {
-// log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
-// return nil
-// }
-// data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
-// }
-// }
-// log.Debug("Filter: ", time.Since(start).String())
-
-// start = time.Now()
-// j, err := jsoniter.Marshal(&data)
-
-// log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
-
-// if err != nil {
-// log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
-// return nil
-// }
-
-// log.Debug("Filtered json obj: ", resource, " len: ", len(j))
-// return &j
-// }
-
func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
log.Info("Type job", type_id, " started")
case "EXIT":
//ignore cmd - handled by channel signal
case "ADD-FILTER":
- //TODO: Refactor...
+
filters[job_ctl.filter.JobId] = job_ctl.filter
log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
log.Debug(job_ctl.filter)
influx_job_params = tmp_influx_job_params
case "REMOVE-FILTER":
- //TODO: Refactor...
+
log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
tmp_filterParams_list := make(map[string]FilterMaps)
}
}
}
- //TODO: Sort processed file conversions in order (FIFO)
+
jobLimiterChan <- struct{}{}
go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
var reader io.Reader
- //TODO -> config
- //INPUTBUCKET := "json-file-ready"
INPUTBUCKET := "pm-files-json"
filename := ""
if objectstore == false {
log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
}
- // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
- // var pmfile PMJsonFile
- // start := time.Now()
- // err = jsoniter.Unmarshal(*data, &pmfile)
- // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
- // if err != nil {
- // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
- // return
- // }
for k, v := range filterList {
var pmfile PMJsonFile
client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
- // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec)
- // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond)
- // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond)
- // timeT := time.Unix(tUnix, tUnixNanoRemainder)
- // fmt.Println(timeT)
- // fmt.Println("======================")
for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
ctr_names := make(map[string]string)
for cni, cn := range zz.MeasTypes.SMeasTypesList {
- ctr_names[string(cni+1)] = cn
+ ctr_names[strconv.Itoa(cni+1)] = cn
}
for _, xx := range zz.MeasValuesList {
log.Debug("Measurement: ", xx.MeasObjInstID)
p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
for _, yy := range xx.MeasResultsList {
- pi := string(yy.P)
+ pi := strconv.Itoa(yy.P)
pv := yy.SValue
pn := ctr_names[pi]
log.Debug("Counter: ", pn, " Value: ", pv)